From 67b662f02665fd9fcc5f6684a362ba95c587aab2 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Thu, 17 Jun 2021 17:14:14 -0700 Subject: [PATCH 1/7] - The max time per test was lowered accidentally. Bringing it back to what's been used as the standard time in other libraries - Fixes to simplify the flaky tests. Mostly just reducing the number of non-deterministic events (connections restarting, etc..) that were being heavily depended on, and just changing them to manually initiate the events we need instead (ie, force detach). - Fixed a spot in receiveMessages() where, if the link had been closed, we'd falsly return an empty array instead of throwing an exception indicating the link closed. --- sdk/servicebus/service-bus/package.json | 4 +- .../service-bus/src/core/batchingReceiver.ts | 2 +- .../internal/backupMessageSettlement.spec.ts | 37 ++- .../test/internal/batchReceiver.spec.ts | 213 +++++++++++++----- .../test/internal/unit/receiver.spec.ts | 9 +- .../service-bus/test/internal/utils/misc.ts | 11 + 6 files changed, 208 insertions(+), 68 deletions(-) diff --git a/sdk/servicebus/service-bus/package.json b/sdk/servicebus/service-bus/package.json index 3b30d1215237..c2879a0db183 100644 --- a/sdk/servicebus/service-bus/package.json +++ b/sdk/servicebus/service-bus/package.json @@ -60,7 +60,7 @@ "extract-api": "tsc -p . && api-extractor run --local", "format": "prettier --write --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"samples/**/*.{ts,js}\" \"src/**/*.ts\" \"test/**/*.ts\" \"samples-dev/**/*.ts\" \"*.{js,json}\"", "integration-test:browser": "karma start --single-run", - "integration-test:node": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace \"dist-esm/test/internal/**/*.spec.js\" \"dist-esm/test/public/**/*.spec.js\"", + "integration-test:node": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 1200000 --full-trace \"dist-esm/test/internal/**/*.spec.js\" \"dist-esm/test/public/**/*.spec.js\"", "integration-test": "npm run integration-test:node && npm run integration-test:browser", "lint:fix": "eslint package.json api-extractor.json src test --ext .ts --fix --fix-type [problem,suggestion]", "lint": "eslint package.json api-extractor.json src test --ext .ts -f html -o service-bus-lintReport.html || exit 0", @@ -71,7 +71,7 @@ "test:node": "npm run clean && npm run build:test:node && npm run integration-test:node", "test": "npm run test:node && npm run test:browser", "unit-test:browser": "echo skipped", - "unit-test:node": "mocha -r esm -r ts-node/register --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace \"test/internal/unit/*.spec.ts\" \"test/internal/node/*.spec.ts\"", + "unit-test:node": "mocha -r esm -r ts-node/register --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 1200000 --full-trace \"test/internal/unit/*.spec.ts\" \"test/internal/node/*.spec.ts\"", "unit-test": "npm run unit-test:node && npm run unit-test:browser", "docs": "typedoc --excludePrivate --excludeNotExported --excludeExternals --stripInternal --mode file --out ./dist/docs ./src" }, diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index 0e9ade3842ec..a25f659bdb42 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -281,7 +281,7 @@ export class BatchingReceiverLite { if (receiver == null) { // (was somehow closed in between the init() and the return) - return []; + throw new ServiceBusError("Link closed before receiving.", "GeneralError"); } const messages = await new Promise((resolve, reject) => diff --git a/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts b/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts index 811d2853f12a..5ea8ad655c10 100644 --- a/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts @@ -19,6 +19,7 @@ import { ServiceBusMessageImpl, ServiceBusReceivedMessage } from "../../src/serviceBusMessage"; +import { disableCommonLoggers, enableCommonLoggers, testLogger } from "./utils/misc"; const should = chai.should(); chai.use(chaiAsPromised); @@ -256,10 +257,17 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", const testMessages = entityNames.usesSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); + + testLogger.info(`sending (and receiving) initial message`); + const msg = await sendReceiveMsg(testMessages); + testLogger.info(`Done sending initial messages`); + const msgDeliveryLink = (msg as ServiceBusMessageImpl).delivery.link.name; + testLogger.info(`About to close the underlying link.`); + if (entityNames.usesSessions) { await (receiver as ServiceBusReceiverImpl)["_context"].messageSessions[ msgDeliveryLink @@ -270,10 +278,18 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", ].close(); } + testLogger.info( + `Underlying link should be closed: ${receiver.isClosed}. This will force us to use the management link to settle. Will now attempt to dead letter.` + ); + let errorWasThrown = false; try { await receiver.deadLetterMessage(msg); + + testLogger.info(`Message has been dead lettered`); } catch (err) { + testLogger.error(`Exception thrown`, err); + should.equal( err.message, `Failed to ${DispositionType.deadletter} the message as the AMQP link with which the message was received is no longer alive.`, @@ -288,6 +304,9 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", should.equal(errorWasThrown, false, "Error was thrown for sessions without session-id"); } + testLogger.info( + `Creating a peek lock dead letter receiver and attempting to receive the dead lettered message` + ); receiver = await serviceBusClient.test.createPeekLockReceiver(entityNames); if (!entityNames.usesSessions) { @@ -310,22 +329,30 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", "MessageId is different than expected" ); + testLogger.info(`Attempting to complete the message: ${deadLetterMsgsBatch[0].messageId}`); await receiver.completeMessage(deadLetterMsgsBatch[0]); - await testPeekMsgsLength(deadLetterReceiver, 0); } else { const messageBatch = await receiver.receiveMessages(1); - await receiver.completeMessage(messageBatch[0]); + testLogger.info(`Attempting to complete the message: ${messageBatch[0].messageId}`); + await receiver.completeMessage(messageBatch[0]); await testPeekMsgsLength(receiver, 0); } + + testLogger.info(`Done testing dead letter`); } - it( + it.only( noSessionTestClientType + ": deadLetter() moves message to deadletter queue", async function(): Promise { - await beforeEachTest(noSessionTestClientType); - await testDeadletter(); + enableCommonLoggers(); + try { + await beforeEachTest(noSessionTestClientType); + await testDeadletter(); + } finally { + disableCommonLoggers(); + } } ); diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index 60c72f14c7d5..9b7bbc5f04f5 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -4,7 +4,13 @@ import chai from "chai"; import Long from "long"; import chaiAsPromised from "chai-as-promised"; -import { ServiceBusMessage, delay, ServiceBusSender, ServiceBusReceivedMessage } from "../../src"; +import { + ServiceBusMessage, + delay, + ServiceBusSender, + ServiceBusReceivedMessage, + ServiceBusError +} from "../../src"; import { InvalidOperationForPeekedMessage } from "../../src/util/errors"; import { TestClientType, TestMessage } from "../public/utils/testUtils"; import { ServiceBusReceiver, ServiceBusReceiverImpl } from "../../src/receivers/receiver"; @@ -18,14 +24,15 @@ import { getRandomTestClientTypeWithSessions } from "../public/utils/testutils2"; import { AbortController } from "@azure/abort-controller"; -import { Receiver, ReceiverEvents } from "rhea-promise"; +import { Receiver } from "rhea-promise"; import { ServiceBusSessionReceiver, ServiceBusSessionReceiverImpl } from "../../src/receivers/sessionReceiver"; -import { ConnectionContext } from "../../src/connectionContext"; import { LinkEntity } from "../../src/core/linkEntity"; -import { StandardAbortMessage } from "@azure/core-amqp"; +import { Constants, StandardAbortMessage } from "@azure/core-amqp"; +import { BatchingReceiver } from "../../src/core/batchingReceiver"; +import { disableCommonLoggers, enableCommonLoggers, testLogger } from "./utils/misc"; const should = chai.should(); chai.use(chaiAsPromised); @@ -807,37 +814,15 @@ describe("Batching Receiver", () => { }); describe("Batch Receiver - disconnects", () => { - function simulateDisconnectDuringDrain( - receiverContext: ConnectionContext, - batchingReceiver: LinkEntity | undefined, - didRequestDrainResolver: Function - ) { - if (!batchingReceiver || !batchingReceiver.isOpen()) { - throw new Error(`batchingReceiver is not open or passed undefined.`); - } - // We want to simulate a disconnect once the batching receiver is draining. - // We can detect when the receiver enters a draining state when `addCredit` is - // called while didRequestDrainResolver is called to resolve the promise. - const addCredit = batchingReceiver["link"]!.addCredit; - batchingReceiver["link"]!.addCredit = function(credits) { - // This makes sure the receiveMessages doesn't end because of draining before the disconnect is triggered - // Meaning.. the "resolving the messages" can only happen through the onDetached triggered by disconnect - batchingReceiver["link"]!.removeAllListeners(ReceiverEvents.receiverDrained); - addCredit.call(this, credits); - if (batchingReceiver["link"]!.drain) { - didRequestDrainResolver(); - // Simulate a disconnect being called with a non-retryable error. - receiverContext.connection["_connection"].idle(); - } - }; - } - - describe(noSessionTestClientType + ": Batch Receiver - disconnects", function(): void { + describe("Batch Receiver - disconnects (non-session)", function(): void { before(() => { + enableCommonLoggers(); + console.log(`Entity type: ${noSessionTestClientType}`); serviceBusClient = createServiceBusClientForTests(); }); after(() => { + disableCommonLoggers(); return serviceBusClient.test.after(); }); @@ -886,41 +871,95 @@ describe("Batching Receiver", () => { refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); }); - it("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< + // + // TODO: this is the one that appears to consistently fail. + // + // + // + // + + // it.only("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< + // void + // > { + it.only("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< void > { // Create the sender and receiver. + + testLogger.info("Before the test"); + await beforeEachTest(noSessionTestClientType, "receiveAndDelete"); // The first time `receiveMessages` is called the receiver link is created. // The `receiver_drained` handler is only added after the link is created, // which is a non-blocking task. - await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; + + testLogger.info( + "Receiving a single message to warm up receiver (there isn't one, so this should just time out)" + ); + + await receiver.receiveMessages(1); + + testLogger.info("After receiving our non-existent warmup message"); + const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + testLogger.info("Sending first message"); + // Send a message so we have something to receive. await sender.sendMessages(TestMessage.getSample()); - const didRequestDrain = new Promise((resolve) => { - simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); - }); + const { onDetachedCalledPromise } = causeDisconnectDuringDrain(batchingReceiver); + + testLogger.info("Receiving first message + 9 more (forces a drain to happen)"); // Purposefully request more messages than what's available // so that the receiver will have to drain. - const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); + const messages1 = await receiver.receiveMessages(10); + + testLogger.info( + `Receiving done, got ${messages1.length} messages, now waiting for detach event since we forced a .idle()` + ); + + const result = await Promise.all([ + onDetachedCalledPromise, + delay( + Constants.defaultOperationTimeoutInMs * 1.5, + undefined, + undefined, + "ondetachednevercalled" + ) + ]); + + if (typeof result === "string" && result === "ondetachednevercalled") { + assert.fail("ondetached was never called for the receiver"); + } - await didRequestDrain; messages1.length.should.equal(1, "Unexpected number of messages received."); // Make sure that a 2nd receiveMessages call still works // by sending and receiving a single message again. - await sender.sendMessages(TestMessage.getSample()); + const sampleMessage = TestMessage.getSample(); + + testLogger.info( + "Sending another sample message for our 'receiver after interrupted batch receiver' receiver" + ); + + await sender.sendMessages(sampleMessage); + + testLogger.info("Message sent, now attempting to receive"); // wait for the 2nd message to be received. - const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + // NOTE: we've forced the connection to restart at this point - it's quite possible to get errors + // while we attempt to receive, so we need to handle that. + const messages2 = await receiver.receiveMessages(1); - messages2.length.should.equal(1, "Unexpected number of messages received."); + testLogger.info("Messages received: ${messages2.length}"); + + assert.deepEqual( + messages2!.map((msg) => msg.body), + [sampleMessage.body] + ); }); it("throws an error if drain is in progress (peekLock)", async function(): Promise { @@ -931,15 +970,12 @@ describe("Batching Receiver", () => { // The `receiver_drained` handler is only added after the link is created, // which is a non-blocking task. await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; // Send a message so we have something to receive. await sender.sendMessages(TestMessage.getSample()); - const didRequestDrain = new Promise((resolve) => { - simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); - }); + const { onDetachedCalledPromise } = causeDisconnectDuringDrain(batchingReceiver); // Purposefully request more messages than what's available // so that the receiver will have to drain. @@ -951,7 +987,7 @@ describe("Batching Receiver", () => { err.message && err.message.should.not.equal(testFailureMessage); } - await didRequestDrain; + await onDetachedCalledPromise; // Make sure that a 2nd receiveMessages call still works // by sending and receiving a single message again. @@ -1045,11 +1081,15 @@ describe("Batching Receiver", () => { }); }); - describe(withSessionTestClientType + ": Batch Receiver - disconnects", function(): void { + describe("Session Batch Receiver - disconnects (sessions)", function(): void { let serviceBusClient: ServiceBusClientForTests; let sender: ServiceBusSender; let receiver: ServiceBusSessionReceiver; + before(() => { + console.log(`Entity type: ${withSessionTestClientType}`); + }); + async function beforeEachTest( receiveMode: "peekLock" | "receiveAndDelete" = "peekLock" ): Promise { @@ -1136,21 +1176,19 @@ describe("Batching Receiver", () => { "Unexpected number of received messages(before disconnect)." ); - const receiverContext = (receiver as ServiceBusSessionReceiverImpl)["_context"]; const batchingReceiver = (receiver as ServiceBusSessionReceiverImpl)["_messageSession"]; // Send a message so we have something to receive. await sender.sendMessages(TestMessage.getSessionSample()); - const didRequestDrain = new Promise((resolve) => { - simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); - }); + const { onDetachedCalledPromise } = causeDisconnectDuringDrain(batchingReceiver); // Purposefully request more messages than what's available // so that the receiver will have to drain. const messages2 = await receiver.receiveMessages(10); - await didRequestDrain; + await onDetachedCalledPromise; + messages2.length.should.equal( 1, "Unexpected number of messages received(during disconnect)." @@ -1196,15 +1234,14 @@ describe("Batching Receiver", () => { "Unexpected number of received messages(before disconnect)." ); - const receiverContext = (receiver as ServiceBusSessionReceiverImpl)["_context"]; const batchingReceiver = (receiver as ServiceBusSessionReceiverImpl)["_messageSession"]; // Send a message so we have something to receive. await sender.sendMessages(TestMessage.getSessionSample()); - const didRequestDrain = new Promise((resolve) => { - simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); - }); + const { onDetachedCalledPromise: drainRequestedPromise } = causeDisconnectDuringDrain( + batchingReceiver + ); // Purposefully request more messages than what's available // so that the receiver will have to drain. @@ -1218,7 +1255,7 @@ describe("Batching Receiver", () => { err.message.should.not.equal(testFailureMessage); } - await didRequestDrain; + await drainRequestedPromise; }); it("returns messages if receive in progress (receiveAndDelete)", async function(): Promise< @@ -1329,3 +1366,65 @@ describe("Batching Receiver", () => { }); }); }); + +/** + * Sets `batchingReceiver` so it's next drain call will result in the connection recycling (via connection.idle()). + * + * The `onDetachedCalledPromise` property in the return object allows you to await until the batching receiver has + * actually been detached. + * + * @param receiverContext An active receiver context we can use to call _connection.idle() + * @param batchingReceiver A batching receiver (minimal interface compatible with sessions and non-sessions) + * @returns an object with `onDetachedCalledPromise` that resolves when onDetach has completed + * for the batching receiver. + */ +function causeDisconnectDuringDrain( + batchingReceiver: (Pick & LinkEntity) | undefined +): { onDetachedCalledPromise: Promise } { + let resolveOnDetachedCallPromise: () => void; + + let onDetachedCalledPromise = new Promise((resolve) => { + resolveOnDetachedCallPromise = resolve; + }); + + if (!batchingReceiver || !batchingReceiver.isOpen()) { + throw new Error(`batchingReceiver is not open or passed undefined.`); + } + + const link = batchingReceiver["link"]; + + if (link == null) { + throw new Error("No active link for batching receiver"); + } + + const origAddCredit = link.addCredit; + + // We want to simulate a disconnect once the batching receiver is draining. + // We can detect when the receiver enters a draining state when `addCredit` is + // called while didRequestDrainResolver is called to resolve the promise. + const addCreditThatImmediatelyDetaches = function(credits: number): void { + origAddCredit.call(link, credits); + + if (link.drain && credits === 1) { + // only need to prevent that first drain from happening in the tests + // that use this "trick". + link["addCredit"] = origAddCredit; + + // initiate the detach now (prior to any possibilty of the 'drain' call being scheduled) + batchingReceiver + .onDetached( + new ServiceBusError( + "Test: purposefully disconnecting receiver before drain", + "GeneralError" + ) + ) + .then(() => resolveOnDetachedCallPromise()); + } + }; + + link["addCredit"] = addCreditThatImmediatelyDetaches; + + return { + onDetachedCalledPromise + }; +} diff --git a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts index 0e10d6bf7ada..c6f0c2f6003d 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts @@ -21,6 +21,7 @@ import { AbortSignalLike } from "@azure/abort-controller"; import { ServiceBusSessionReceiverImpl } from "../../../src/receivers/sessionReceiver"; import { MessageSession } from "../../../src/session/messageSession"; import sinon from "sinon"; +import { assertThrows } from "../../public/utils/testUtils"; describe("Receiver unit tests", () => { describe("init() and close() interactions", () => { @@ -43,9 +44,11 @@ describe("Receiver unit tests", () => { }; // make an init() happen internally. - const emptyArrayOfMessages = await batchingReceiver.receive(1, 1, 1, {}); - - assert.isEmpty(emptyArrayOfMessages); + await assertThrows(() => batchingReceiver.receive(1, 1, 1, {}), { + name: "ServiceBusError", + code: "GeneralError", + message: "Link closed before receiving." + }); assert.isTrue(initWasCalled); }); diff --git a/sdk/servicebus/service-bus/test/internal/utils/misc.ts b/sdk/servicebus/service-bus/test/internal/utils/misc.ts index 06b52e521046..41cea80aeda0 100644 --- a/sdk/servicebus/service-bus/test/internal/utils/misc.ts +++ b/sdk/servicebus/service-bus/test/internal/utils/misc.ts @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +import { createClientLogger, setLogLevel } from "@azure/logger"; import { Delivery, ServiceBusReceivedMessage } from "../../../src"; import { ServiceBusMessageImpl } from "../../../src/serviceBusMessage"; @@ -17,3 +18,13 @@ export function getDeliveryProperty(message: ServiceBusReceivedMessage): Deliver "Received message does not contain a .delivery member - not a ServiceBusMessageImpl instance." ); } + +export const testLogger = createClientLogger("test"); + +export function enableCommonLoggers() { + setLogLevel("verbose"); +} + +export function disableCommonLoggers() { + setLogLevel(); +} From 755d26f71da06e4aaa40f6cf26d11044d03e6a86 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Fri, 18 Jun 2021 10:22:13 -0700 Subject: [PATCH 2/7] Remove .only's --- .../test/internal/backupMessageSettlement.spec.ts | 2 +- .../service-bus/test/internal/batchReceiver.spec.ts | 12 +----------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts b/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts index 5ea8ad655c10..a9e958cd6e10 100644 --- a/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts @@ -343,7 +343,7 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", testLogger.info(`Done testing dead letter`); } - it.only( + it( noSessionTestClientType + ": deadLetter() moves message to deadletter queue", async function(): Promise { enableCommonLoggers(); diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index 9b7bbc5f04f5..060891b0f7d8 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -871,17 +871,7 @@ describe("Batching Receiver", () => { refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); }); - // - // TODO: this is the one that appears to consistently fail. - // - // - // - // - - // it.only("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< - // void - // > { - it.only("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< + it("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< void > { // Create the sender and receiver. From 9ef68d55ad504c21cf94238cb633bf2b64e734cf Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Fri, 18 Jun 2021 10:24:10 -0700 Subject: [PATCH 3/7] Make error message more clear (suggestion via the chradek) Co-authored-by: chradek <51000525+chradek@users.noreply.github.com> --- sdk/servicebus/service-bus/src/core/batchingReceiver.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index a25f659bdb42..b298f4b4cbc2 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -281,7 +281,7 @@ export class BatchingReceiverLite { if (receiver == null) { // (was somehow closed in between the init() and the return) - throw new ServiceBusError("Link closed before receiving.", "GeneralError"); + throw new ServiceBusError("Link closed before receiving messages.", "GeneralError"); } const messages = await new Promise((resolve, reject) => From 10f6849f2a9a19d5247d8ff1931d50d6f8f1728c Mon Sep 17 00:00:00 2001 From: Richard Park Date: Fri, 18 Jun 2021 10:27:47 -0700 Subject: [PATCH 4/7] Updating doc comment to be accurate (and removing parameter that no longer exists) --- .../service-bus/test/internal/batchReceiver.spec.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index 060891b0f7d8..2d56df39d007 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -1358,14 +1358,16 @@ describe("Batching Receiver", () => { }); /** - * Sets `batchingReceiver` so it's next drain call will result in the connection recycling (via connection.idle()). + * Sets `batchingReceiver` so it's next drain call will result in the connection recycling prior to the + * drain completing. The primary use is just to make sure that when we terminate a receiveMessages() call + * early due to a disconnect (and interrupt the drain) that the link is restored and can be used + * again afterwards. * - * The `onDetachedCalledPromise` property in the return object allows you to await until the batching receiver has - * actually been detached. + * The `onDetachedCalledPromise` property in the return object allows you to await until the batching + * receiver has actually been detached. * - * @param receiverContext An active receiver context we can use to call _connection.idle() * @param batchingReceiver A batching receiver (minimal interface compatible with sessions and non-sessions) - * @returns an object with `onDetachedCalledPromise` that resolves when onDetach has completed + * @returns an object with `onDetachedCalledPromise` that resolves when onDetached has completed * for the batching receiver. */ function causeDisconnectDuringDrain( From 43f118dfd82c768c22866eaaed04a500f68ba658 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Fri, 18 Jun 2021 10:59:59 -0700 Subject: [PATCH 5/7] Updated error message, needed to update unit test! --- sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts index c6f0c2f6003d..f8b3080d9bdf 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts @@ -47,7 +47,7 @@ describe("Receiver unit tests", () => { await assertThrows(() => batchingReceiver.receive(1, 1, 1, {}), { name: "ServiceBusError", code: "GeneralError", - message: "Link closed before receiving." + message: "Link closed before receiving messages." }); assert.isTrue(initWasCalled); }); From 37479cb87723aebb59c432b7e3256db48ea39e94 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Fri, 18 Jun 2021 16:13:14 -0700 Subject: [PATCH 6/7] Fixing two kinds of failures: - The "open a receiver on a non-existent entity" should wait a bit longer since establishing the connection can be slow if network conditions are bad. - Updating to throw a more realistic error inside of the onDetached() call (it'll be a non-retryable exception, not a ServiceBusError like I used). --- .../test/internal/batchReceiver.spec.ts | 46 ++++++++----------- .../test/internal/serviceBusClient.spec.ts | 7 ++- 2 files changed, 24 insertions(+), 29 deletions(-) diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index 2d56df39d007..ff18a2b1c2fc 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -4,13 +4,7 @@ import chai from "chai"; import Long from "long"; import chaiAsPromised from "chai-as-promised"; -import { - ServiceBusMessage, - delay, - ServiceBusSender, - ServiceBusReceivedMessage, - ServiceBusError -} from "../../src"; +import { ServiceBusMessage, delay, ServiceBusSender, ServiceBusReceivedMessage } from "../../src"; import { InvalidOperationForPeekedMessage } from "../../src/util/errors"; import { TestClientType, TestMessage } from "../public/utils/testUtils"; import { ServiceBusReceiver, ServiceBusReceiverImpl } from "../../src/receivers/receiver"; @@ -971,10 +965,13 @@ describe("Batching Receiver", () => { // so that the receiver will have to drain. const testFailureMessage = "Test failure"; try { - await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); + await receiver.receiveMessages(10); throw new Error(testFailureMessage); } catch (err) { - err.message && err.message.should.not.equal(testFailureMessage); + assert.deepNestedInclude(err, { + name: "Error", + message: "Test: fake connection failure" + }); } await onDetachedCalledPromise; @@ -984,7 +981,7 @@ describe("Batching Receiver", () => { await sender.sendMessages(TestMessage.getSample()); // wait for the 2nd message to be received. - const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + const messages = await receiver.receiveMessages(1); messages.length.should.equal(1, "Unexpected number of messages received."); }); @@ -1237,12 +1234,13 @@ describe("Batching Receiver", () => { // so that the receiver will have to drain. const testFailureMessage = "Test failure"; try { - await receiver.receiveMessages(10, { maxWaitTimeInMs: 5000 }); + await receiver.receiveMessages(10); throw new Error(testFailureMessage); } catch (err) { - err.message && - err.code.should.equal("SessionLockLost") && - err.message.should.not.equal(testFailureMessage); + assert.deepNestedInclude(err, { + name: "Error", + message: "Test: fake connection failure" + }); } await drainRequestedPromise; @@ -1345,12 +1343,13 @@ describe("Batching Receiver", () => { // so that the receiver will have to drain. const testFailureMessage = "Test failure"; try { - await receiver.receiveMessages(10, { maxWaitTimeInMs: 5000 }); + await receiver.receiveMessages(10); throw new Error(testFailureMessage); } catch (err) { - err.message && - err.code.should.equal("SessionLockLost") && - err.message.should.not.equal(testFailureMessage); + assert.deepNestedInclude(err, { + name: "Error", + message: "Test: fake connection failure" + }); } }); }); @@ -1398,18 +1397,9 @@ function causeDisconnectDuringDrain( origAddCredit.call(link, credits); if (link.drain && credits === 1) { - // only need to prevent that first drain from happening in the tests - // that use this "trick". - link["addCredit"] = origAddCredit; - // initiate the detach now (prior to any possibilty of the 'drain' call being scheduled) batchingReceiver - .onDetached( - new ServiceBusError( - "Test: purposefully disconnecting receiver before drain", - "GeneralError" - ) - ) + .onDetached(new Error("Test: fake connection failure")) .then(() => resolveOnDetachedCallPromise()); } }; diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts index 1b2467a97266..6bf5b22250fa 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts @@ -5,6 +5,7 @@ import { EnvironmentCredential } from "@azure/identity"; import chai from "chai"; import chaiAsPromised from "chai-as-promised"; import * as dotenv from "dotenv"; +import { Constants as CoreAmqpConstants } from "@azure/core-amqp"; import Long from "long"; import { isServiceBusError, @@ -294,7 +295,11 @@ describe("ServiceBusClient live tests", () => { }); should.equal( - await checkWithTimeout(() => errorWasThrown === true, 10, 3000), + await checkWithTimeout( + () => errorWasThrown === true, + 1000, + CoreAmqpConstants.defaultOperationTimeoutInMs * 2 // arbitrary, just don't want it to be too short. + ), true, "Error thrown flag must be true" ); From baa480ae1e688a27dc8a11fb6d045fbeb5b3893a Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Sat, 19 Jun 2021 00:44:38 +0000 Subject: [PATCH 7/7] - Fixing an error where I'd inadvertantly covered a compile problem (added when I was trying to allow use of a deprecated field) - Fixing assert to look for SessionLockLost error with peekLock+session disconnection tests. --- .../service-bus/test/internal/batchReceiver.spec.ts | 4 ++-- sdk/servicebus/service-bus/test/internal/tracing.spec.ts | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index ff18a2b1c2fc..a5ee9c1fa6bc 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -1347,8 +1347,8 @@ describe("Batching Receiver", () => { throw new Error(testFailureMessage); } catch (err) { assert.deepNestedInclude(err, { - name: "Error", - message: "Test: fake connection failure" + name: "ServiceBusError", + code: "SessionLockLost" }); } }); diff --git a/sdk/servicebus/service-bus/test/internal/tracing.spec.ts b/sdk/servicebus/service-bus/test/internal/tracing.spec.ts index 25be1312348d..ef6d64e08351 100644 --- a/sdk/servicebus/service-bus/test/internal/tracing.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/tracing.spec.ts @@ -12,14 +12,12 @@ import { function legacyOptionsUsingSpanContext(rootSpan: TestSpan): Pick { return { - // @ts-ignore Using the deprecated field for testing - parentSpan: rootSpan.context() + parentSpan: rootSpan.spanContext() }; } function legacyOptionsUsingSpan(rootSpan: TestSpan): Pick { return { - // @ts-ignore Using the deprecated field for testing parentSpan: rootSpan }; }