diff --git a/sdk/servicebus/service-bus/package.json b/sdk/servicebus/service-bus/package.json index 5a1bf56fba09..e4934376d22e 100644 --- a/sdk/servicebus/service-bus/package.json +++ b/sdk/servicebus/service-bus/package.json @@ -60,7 +60,7 @@ "extract-api": "tsc -p . && api-extractor run --local", "format": "prettier --write --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"samples/**/*.{ts,js}\" \"src/**/*.ts\" \"test/**/*.ts\" \"samples-dev/**/*.ts\" \"*.{js,json}\"", "integration-test:browser": "karma start --single-run", - "integration-test:node": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace \"dist-esm/test/internal/**/*.spec.js\" \"dist-esm/test/public/**/*.spec.js\"", + "integration-test:node": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 1200000 --full-trace \"dist-esm/test/internal/**/*.spec.js\" \"dist-esm/test/public/**/*.spec.js\"", "integration-test": "npm run integration-test:node && npm run integration-test:browser", "lint:fix": "eslint package.json api-extractor.json src test --ext .ts --fix --fix-type [problem,suggestion]", "lint": "eslint package.json api-extractor.json src test --ext .ts -f html -o service-bus-lintReport.html || exit 0", @@ -71,7 +71,7 @@ "test:node": "npm run clean && npm run build:test:node && npm run integration-test:node", "test": "npm run test:node && npm run test:browser", "unit-test:browser": "echo skipped", - "unit-test:node": "mocha -r esm -r ts-node/register --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace \"test/internal/unit/*.spec.ts\" \"test/internal/node/*.spec.ts\"", + "unit-test:node": "mocha -r esm -r ts-node/register --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 1200000 --full-trace \"test/internal/unit/*.spec.ts\" \"test/internal/node/*.spec.ts\"", "unit-test": "npm run unit-test:node && npm run unit-test:browser", "docs": "typedoc --excludePrivate --excludeNotExported --excludeExternals --stripInternal --mode file --out ./dist/docs ./src" }, diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index 0e9ade3842ec..b298f4b4cbc2 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -281,7 +281,7 @@ export class BatchingReceiverLite { if (receiver == null) { // (was somehow closed in between the init() and the return) - return []; + throw new ServiceBusError("Link closed before receiving messages.", "GeneralError"); } const messages = await new Promise((resolve, reject) => diff --git a/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts b/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts index 811d2853f12a..a9e958cd6e10 100644 --- a/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts @@ -19,6 +19,7 @@ import { ServiceBusMessageImpl, ServiceBusReceivedMessage } from "../../src/serviceBusMessage"; +import { disableCommonLoggers, enableCommonLoggers, testLogger } from "./utils/misc"; const should = chai.should(); chai.use(chaiAsPromised); @@ -256,10 +257,17 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", const testMessages = entityNames.usesSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); + + testLogger.info(`sending (and receiving) initial message`); + const msg = await sendReceiveMsg(testMessages); + testLogger.info(`Done sending initial messages`); + const msgDeliveryLink = (msg as ServiceBusMessageImpl).delivery.link.name; + testLogger.info(`About to close the underlying link.`); + if (entityNames.usesSessions) { await (receiver as ServiceBusReceiverImpl)["_context"].messageSessions[ msgDeliveryLink @@ -270,10 +278,18 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", ].close(); } + testLogger.info( + `Underlying link should be closed: ${receiver.isClosed}. This will force us to use the management link to settle. Will now attempt to dead letter.` + ); + let errorWasThrown = false; try { await receiver.deadLetterMessage(msg); + + testLogger.info(`Message has been dead lettered`); } catch (err) { + testLogger.error(`Exception thrown`, err); + should.equal( err.message, `Failed to ${DispositionType.deadletter} the message as the AMQP link with which the message was received is no longer alive.`, @@ -288,6 +304,9 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", should.equal(errorWasThrown, false, "Error was thrown for sessions without session-id"); } + testLogger.info( + `Creating a peek lock dead letter receiver and attempting to receive the dead lettered message` + ); receiver = await serviceBusClient.test.createPeekLockReceiver(entityNames); if (!entityNames.usesSessions) { @@ -310,22 +329,30 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", "MessageId is different than expected" ); + testLogger.info(`Attempting to complete the message: ${deadLetterMsgsBatch[0].messageId}`); await receiver.completeMessage(deadLetterMsgsBatch[0]); - await testPeekMsgsLength(deadLetterReceiver, 0); } else { const messageBatch = await receiver.receiveMessages(1); - await receiver.completeMessage(messageBatch[0]); + testLogger.info(`Attempting to complete the message: ${messageBatch[0].messageId}`); + await receiver.completeMessage(messageBatch[0]); await testPeekMsgsLength(receiver, 0); } + + testLogger.info(`Done testing dead letter`); } it( noSessionTestClientType + ": deadLetter() moves message to deadletter queue", async function(): Promise { - await beforeEachTest(noSessionTestClientType); - await testDeadletter(); + enableCommonLoggers(); + try { + await beforeEachTest(noSessionTestClientType); + await testDeadletter(); + } finally { + disableCommonLoggers(); + } } ); diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index 60c72f14c7d5..a5ee9c1fa6bc 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -18,14 +18,15 @@ import { getRandomTestClientTypeWithSessions } from "../public/utils/testutils2"; import { AbortController } from "@azure/abort-controller"; -import { Receiver, ReceiverEvents } from "rhea-promise"; +import { Receiver } from "rhea-promise"; import { ServiceBusSessionReceiver, ServiceBusSessionReceiverImpl } from "../../src/receivers/sessionReceiver"; -import { ConnectionContext } from "../../src/connectionContext"; import { LinkEntity } from "../../src/core/linkEntity"; -import { StandardAbortMessage } from "@azure/core-amqp"; +import { Constants, StandardAbortMessage } from "@azure/core-amqp"; +import { BatchingReceiver } from "../../src/core/batchingReceiver"; +import { disableCommonLoggers, enableCommonLoggers, testLogger } from "./utils/misc"; const should = chai.should(); chai.use(chaiAsPromised); @@ -807,37 +808,15 @@ describe("Batching Receiver", () => { }); describe("Batch Receiver - disconnects", () => { - function simulateDisconnectDuringDrain( - receiverContext: ConnectionContext, - batchingReceiver: LinkEntity | undefined, - didRequestDrainResolver: Function - ) { - if (!batchingReceiver || !batchingReceiver.isOpen()) { - throw new Error(`batchingReceiver is not open or passed undefined.`); - } - // We want to simulate a disconnect once the batching receiver is draining. - // We can detect when the receiver enters a draining state when `addCredit` is - // called while didRequestDrainResolver is called to resolve the promise. - const addCredit = batchingReceiver["link"]!.addCredit; - batchingReceiver["link"]!.addCredit = function(credits) { - // This makes sure the receiveMessages doesn't end because of draining before the disconnect is triggered - // Meaning.. the "resolving the messages" can only happen through the onDetached triggered by disconnect - batchingReceiver["link"]!.removeAllListeners(ReceiverEvents.receiverDrained); - addCredit.call(this, credits); - if (batchingReceiver["link"]!.drain) { - didRequestDrainResolver(); - // Simulate a disconnect being called with a non-retryable error. - receiverContext.connection["_connection"].idle(); - } - }; - } - - describe(noSessionTestClientType + ": Batch Receiver - disconnects", function(): void { + describe("Batch Receiver - disconnects (non-session)", function(): void { before(() => { + enableCommonLoggers(); + console.log(`Entity type: ${noSessionTestClientType}`); serviceBusClient = createServiceBusClientForTests(); }); after(() => { + disableCommonLoggers(); return serviceBusClient.test.after(); }); @@ -890,37 +869,81 @@ describe("Batching Receiver", () => { void > { // Create the sender and receiver. + + testLogger.info("Before the test"); + await beforeEachTest(noSessionTestClientType, "receiveAndDelete"); // The first time `receiveMessages` is called the receiver link is created. // The `receiver_drained` handler is only added after the link is created, // which is a non-blocking task. - await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; + + testLogger.info( + "Receiving a single message to warm up receiver (there isn't one, so this should just time out)" + ); + + await receiver.receiveMessages(1); + + testLogger.info("After receiving our non-existent warmup message"); + const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + testLogger.info("Sending first message"); + // Send a message so we have something to receive. await sender.sendMessages(TestMessage.getSample()); - const didRequestDrain = new Promise((resolve) => { - simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); - }); + const { onDetachedCalledPromise } = causeDisconnectDuringDrain(batchingReceiver); + + testLogger.info("Receiving first message + 9 more (forces a drain to happen)"); // Purposefully request more messages than what's available // so that the receiver will have to drain. - const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); + const messages1 = await receiver.receiveMessages(10); + + testLogger.info( + `Receiving done, got ${messages1.length} messages, now waiting for detach event since we forced a .idle()` + ); + + const result = await Promise.all([ + onDetachedCalledPromise, + delay( + Constants.defaultOperationTimeoutInMs * 1.5, + undefined, + undefined, + "ondetachednevercalled" + ) + ]); + + if (typeof result === "string" && result === "ondetachednevercalled") { + assert.fail("ondetached was never called for the receiver"); + } - await didRequestDrain; messages1.length.should.equal(1, "Unexpected number of messages received."); // Make sure that a 2nd receiveMessages call still works // by sending and receiving a single message again. - await sender.sendMessages(TestMessage.getSample()); + const sampleMessage = TestMessage.getSample(); + + testLogger.info( + "Sending another sample message for our 'receiver after interrupted batch receiver' receiver" + ); + + await sender.sendMessages(sampleMessage); + + testLogger.info("Message sent, now attempting to receive"); // wait for the 2nd message to be received. - const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + // NOTE: we've forced the connection to restart at this point - it's quite possible to get errors + // while we attempt to receive, so we need to handle that. + const messages2 = await receiver.receiveMessages(1); - messages2.length.should.equal(1, "Unexpected number of messages received."); + testLogger.info("Messages received: ${messages2.length}"); + + assert.deepEqual( + messages2!.map((msg) => msg.body), + [sampleMessage.body] + ); }); it("throws an error if drain is in progress (peekLock)", async function(): Promise { @@ -931,34 +954,34 @@ describe("Batching Receiver", () => { // The `receiver_drained` handler is only added after the link is created, // which is a non-blocking task. await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; // Send a message so we have something to receive. await sender.sendMessages(TestMessage.getSample()); - const didRequestDrain = new Promise((resolve) => { - simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); - }); + const { onDetachedCalledPromise } = causeDisconnectDuringDrain(batchingReceiver); // Purposefully request more messages than what's available // so that the receiver will have to drain. const testFailureMessage = "Test failure"; try { - await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); + await receiver.receiveMessages(10); throw new Error(testFailureMessage); } catch (err) { - err.message && err.message.should.not.equal(testFailureMessage); + assert.deepNestedInclude(err, { + name: "Error", + message: "Test: fake connection failure" + }); } - await didRequestDrain; + await onDetachedCalledPromise; // Make sure that a 2nd receiveMessages call still works // by sending and receiving a single message again. await sender.sendMessages(TestMessage.getSample()); // wait for the 2nd message to be received. - const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + const messages = await receiver.receiveMessages(1); messages.length.should.equal(1, "Unexpected number of messages received."); }); @@ -1045,11 +1068,15 @@ describe("Batching Receiver", () => { }); }); - describe(withSessionTestClientType + ": Batch Receiver - disconnects", function(): void { + describe("Session Batch Receiver - disconnects (sessions)", function(): void { let serviceBusClient: ServiceBusClientForTests; let sender: ServiceBusSender; let receiver: ServiceBusSessionReceiver; + before(() => { + console.log(`Entity type: ${withSessionTestClientType}`); + }); + async function beforeEachTest( receiveMode: "peekLock" | "receiveAndDelete" = "peekLock" ): Promise { @@ -1136,21 +1163,19 @@ describe("Batching Receiver", () => { "Unexpected number of received messages(before disconnect)." ); - const receiverContext = (receiver as ServiceBusSessionReceiverImpl)["_context"]; const batchingReceiver = (receiver as ServiceBusSessionReceiverImpl)["_messageSession"]; // Send a message so we have something to receive. await sender.sendMessages(TestMessage.getSessionSample()); - const didRequestDrain = new Promise((resolve) => { - simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); - }); + const { onDetachedCalledPromise } = causeDisconnectDuringDrain(batchingReceiver); // Purposefully request more messages than what's available // so that the receiver will have to drain. const messages2 = await receiver.receiveMessages(10); - await didRequestDrain; + await onDetachedCalledPromise; + messages2.length.should.equal( 1, "Unexpected number of messages received(during disconnect)." @@ -1196,29 +1221,29 @@ describe("Batching Receiver", () => { "Unexpected number of received messages(before disconnect)." ); - const receiverContext = (receiver as ServiceBusSessionReceiverImpl)["_context"]; const batchingReceiver = (receiver as ServiceBusSessionReceiverImpl)["_messageSession"]; // Send a message so we have something to receive. await sender.sendMessages(TestMessage.getSessionSample()); - const didRequestDrain = new Promise((resolve) => { - simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); - }); + const { onDetachedCalledPromise: drainRequestedPromise } = causeDisconnectDuringDrain( + batchingReceiver + ); // Purposefully request more messages than what's available // so that the receiver will have to drain. const testFailureMessage = "Test failure"; try { - await receiver.receiveMessages(10, { maxWaitTimeInMs: 5000 }); + await receiver.receiveMessages(10); throw new Error(testFailureMessage); } catch (err) { - err.message && - err.code.should.equal("SessionLockLost") && - err.message.should.not.equal(testFailureMessage); + assert.deepNestedInclude(err, { + name: "Error", + message: "Test: fake connection failure" + }); } - await didRequestDrain; + await drainRequestedPromise; }); it("returns messages if receive in progress (receiveAndDelete)", async function(): Promise< @@ -1318,14 +1343,70 @@ describe("Batching Receiver", () => { // so that the receiver will have to drain. const testFailureMessage = "Test failure"; try { - await receiver.receiveMessages(10, { maxWaitTimeInMs: 5000 }); + await receiver.receiveMessages(10); throw new Error(testFailureMessage); } catch (err) { - err.message && - err.code.should.equal("SessionLockLost") && - err.message.should.not.equal(testFailureMessage); + assert.deepNestedInclude(err, { + name: "ServiceBusError", + code: "SessionLockLost" + }); } }); }); }); }); + +/** + * Sets `batchingReceiver` so it's next drain call will result in the connection recycling prior to the + * drain completing. The primary use is just to make sure that when we terminate a receiveMessages() call + * early due to a disconnect (and interrupt the drain) that the link is restored and can be used + * again afterwards. + * + * The `onDetachedCalledPromise` property in the return object allows you to await until the batching + * receiver has actually been detached. + * + * @param batchingReceiver A batching receiver (minimal interface compatible with sessions and non-sessions) + * @returns an object with `onDetachedCalledPromise` that resolves when onDetached has completed + * for the batching receiver. + */ +function causeDisconnectDuringDrain( + batchingReceiver: (Pick & LinkEntity) | undefined +): { onDetachedCalledPromise: Promise } { + let resolveOnDetachedCallPromise: () => void; + + let onDetachedCalledPromise = new Promise((resolve) => { + resolveOnDetachedCallPromise = resolve; + }); + + if (!batchingReceiver || !batchingReceiver.isOpen()) { + throw new Error(`batchingReceiver is not open or passed undefined.`); + } + + const link = batchingReceiver["link"]; + + if (link == null) { + throw new Error("No active link for batching receiver"); + } + + const origAddCredit = link.addCredit; + + // We want to simulate a disconnect once the batching receiver is draining. + // We can detect when the receiver enters a draining state when `addCredit` is + // called while didRequestDrainResolver is called to resolve the promise. + const addCreditThatImmediatelyDetaches = function(credits: number): void { + origAddCredit.call(link, credits); + + if (link.drain && credits === 1) { + // initiate the detach now (prior to any possibilty of the 'drain' call being scheduled) + batchingReceiver + .onDetached(new Error("Test: fake connection failure")) + .then(() => resolveOnDetachedCallPromise()); + } + }; + + link["addCredit"] = addCreditThatImmediatelyDetaches; + + return { + onDetachedCalledPromise + }; +} diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts index 1b2467a97266..6bf5b22250fa 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts @@ -5,6 +5,7 @@ import { EnvironmentCredential } from "@azure/identity"; import chai from "chai"; import chaiAsPromised from "chai-as-promised"; import * as dotenv from "dotenv"; +import { Constants as CoreAmqpConstants } from "@azure/core-amqp"; import Long from "long"; import { isServiceBusError, @@ -294,7 +295,11 @@ describe("ServiceBusClient live tests", () => { }); should.equal( - await checkWithTimeout(() => errorWasThrown === true, 10, 3000), + await checkWithTimeout( + () => errorWasThrown === true, + 1000, + CoreAmqpConstants.defaultOperationTimeoutInMs * 2 // arbitrary, just don't want it to be too short. + ), true, "Error thrown flag must be true" ); diff --git a/sdk/servicebus/service-bus/test/internal/tracing.spec.ts b/sdk/servicebus/service-bus/test/internal/tracing.spec.ts index 25be1312348d..ef6d64e08351 100644 --- a/sdk/servicebus/service-bus/test/internal/tracing.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/tracing.spec.ts @@ -12,14 +12,12 @@ import { function legacyOptionsUsingSpanContext(rootSpan: TestSpan): Pick { return { - // @ts-ignore Using the deprecated field for testing - parentSpan: rootSpan.context() + parentSpan: rootSpan.spanContext() }; } function legacyOptionsUsingSpan(rootSpan: TestSpan): Pick { return { - // @ts-ignore Using the deprecated field for testing parentSpan: rootSpan }; } diff --git a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts index 0e10d6bf7ada..f8b3080d9bdf 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts @@ -21,6 +21,7 @@ import { AbortSignalLike } from "@azure/abort-controller"; import { ServiceBusSessionReceiverImpl } from "../../../src/receivers/sessionReceiver"; import { MessageSession } from "../../../src/session/messageSession"; import sinon from "sinon"; +import { assertThrows } from "../../public/utils/testUtils"; describe("Receiver unit tests", () => { describe("init() and close() interactions", () => { @@ -43,9 +44,11 @@ describe("Receiver unit tests", () => { }; // make an init() happen internally. - const emptyArrayOfMessages = await batchingReceiver.receive(1, 1, 1, {}); - - assert.isEmpty(emptyArrayOfMessages); + await assertThrows(() => batchingReceiver.receive(1, 1, 1, {}), { + name: "ServiceBusError", + code: "GeneralError", + message: "Link closed before receiving messages." + }); assert.isTrue(initWasCalled); }); diff --git a/sdk/servicebus/service-bus/test/internal/utils/misc.ts b/sdk/servicebus/service-bus/test/internal/utils/misc.ts index 06b52e521046..41cea80aeda0 100644 --- a/sdk/servicebus/service-bus/test/internal/utils/misc.ts +++ b/sdk/servicebus/service-bus/test/internal/utils/misc.ts @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +import { createClientLogger, setLogLevel } from "@azure/logger"; import { Delivery, ServiceBusReceivedMessage } from "../../../src"; import { ServiceBusMessageImpl } from "../../../src/serviceBusMessage"; @@ -17,3 +18,13 @@ export function getDeliveryProperty(message: ServiceBusReceivedMessage): Deliver "Received message does not contain a .delivery member - not a ServiceBusMessageImpl instance." ); } + +export const testLogger = createClientLogger("test"); + +export function enableCommonLoggers() { + setLogLevel("verbose"); +} + +export function disableCommonLoggers() { + setLogLevel(); +}