diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 7298a1c532ef..2e03e5fe4b1d 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -5,6 +5,8 @@ - Updates documentation for `ServiceBusMessage` to call out that the `body` field must be converted to a byte array or `Buffer` when cross-language compatibility while receiving events is required. +- Fix issue where receiveMessages might return fewer messages than were received, causing them to be potentially locked or lost. + [PR 12772](https://github.com/Azure/azure-sdk-for-js/pull/12772) ## 7.0.0 (2020-11-23) diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index a1245921d8b0..d465bf673a49 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -458,7 +458,13 @@ export class BatchingReceiverLite { `${loggingPrefix} Drained, resolving receiveMessages() with ${brokeredMessages.length} messages.` ); - resolve(brokeredMessages); + // NOTE: through rhea-promise most of our event handlers are made asynchronous by calling setTimeout(emit). + // However, a small set (*error and drain) execute immediately. This can lead to a situation where the logical + // ordering of events is correct but the execution order is incorrect because the events are not all getting + // put into the task queue the same way. + // So this call, while odd, just ensures that we resolve _after_ any already-queued onMessage handlers that may + // be waiting in the task queue. + setTimeout(() => resolve(brokeredMessages)); }; cleanupBeforeResolveOrReject = ( diff --git a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts index d6f9bf59f11d..73aaddf9bdff 100644 --- a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts @@ -32,6 +32,7 @@ import { ConnectionContext } from "../../src/connectionContext"; import { ServiceBusReceiverImpl } from "../../src/receivers/receiver"; import { OperationOptionsBase } from "../../src/modelsToBeSharedWithEventHubs"; import { ReceiveMode } from "../../src/models"; +import { Constants } from "@azure/core-amqp"; describe("BatchingReceiver unit tests", () => { let closeables: { close(): Promise }[]; @@ -267,7 +268,8 @@ describe("BatchingReceiver unit tests", () => { closeables.push(receiver); const { receiveIsReady, emitter, remainingRegisteredListeners } = setupBatchingReceiver( - receiver + receiver, + clock ); const receivePromise = receiver.receive(3, bigTimeout, littleTimeout, {}); @@ -373,7 +375,8 @@ describe("BatchingReceiver unit tests", () => { closeables.push(receiver); const { receiveIsReady, emitter, remainingRegisteredListeners } = setupBatchingReceiver( - receiver + receiver, + clock ); let wasCalled = false; @@ -420,7 +423,8 @@ describe("BatchingReceiver unit tests", () => { ); function setupBatchingReceiver( - batchingReceiver: BatchingReceiver + batchingReceiver: BatchingReceiver, + clock?: ReturnType ): { receiveIsReady: Promise; emitter: EventEmitter; @@ -431,7 +435,7 @@ describe("BatchingReceiver unit tests", () => { emitter, remainingRegisteredListeners, receiveIsReady - } = createFakeReceiver(); + } = createFakeReceiver(clock); batchingReceiver["_link"] = fakeRheaReceiver; @@ -450,7 +454,9 @@ describe("BatchingReceiver unit tests", () => { }); }); - function createFakeReceiver(): { + function createFakeReceiver( + clock?: ReturnType + ): { receiveIsReady: Promise; emitter: EventEmitter; remainingRegisteredListeners: Set; @@ -500,6 +506,7 @@ describe("BatchingReceiver unit tests", () => { if (_credits === 1 && fakeRheaReceiver.drain === true) { // special case - if we're draining we should initiate a drain emitter.emit(ReceiverEvents.receiverDrained, undefined); + clock?.runAll(); } else { credits += _credits; } @@ -658,4 +665,78 @@ describe("BatchingReceiver unit tests", () => { assert.isEmpty(results); }); }); + + it("drain doesn't resolve before message callbacks have completed", async () => { + const { fakeRheaReceiver, emitter, receiveIsReady } = createFakeReceiver(); + + const receiver = new BatchingReceiverLite( + createConnectionContextForTests(), + "fakeEntityPath", + async () => { + return fakeRheaReceiver; + }, + "peekLock" + ); + + const receiveMessagesPromise = receiver + .receiveMessages({ + maxMessageCount: 3, + maxTimeAfterFirstMessageInMs: 5000, + maxWaitTimeInMs: 5000 + }) + .then((messages) => { + console.log(`===> then running, messages: ${messages.map((m) => m.body).join(", ")}`); + return [...messages]; + }); + + await receiveIsReady; + + // We've had an issue in the past where it seemed that drain was + // causing us to potentially not return messages that should have + // existed. In our tests this is very hard to reproduce because it + // requires us to control more of how the stream of events are + // returned in Service Bus. + + // Our suspicion has been that it's possible we're receiving messages _after_ a + // drain has occurred but we've never seen it in the wild or in any of our testing. + + // However, in rhea-promise there is an odd mismatch of dispatching that can cause this + // sequence of events to occur: + // 1. rhea-promise: receive message A + // rhea-promise then calls: setTimeout(emit(messageA)) + // 2. us: decide to drain (timeout expired) + // 3. rhea-promise: sends drain + // 4. rhea-promise: receives message B (Service Bus has not yet processed the drain request) + // rhea-promise then calls: setTimeout(emit(messageB)) + // + // Now at this point we have the setTimeout(emit(messageB)) in the task queue. It'll be + // executed at the next turn of the event loop. + // + // The problem then comes in when rhea-promise receives the receiver_drained event: + // 4. rhea-promise: receives receiver_drain event + // emit(drain) // note it does _not_ use setTimeout() + // + // This causes the drain event to fire immediately. When it resolves the underlying promise + // it resolves it prior to emit(messageB) firing, resulting in lost messages. + // + // To fix this when we get receive_drained we setTimeout(resolve) instead of just immediately resolving. This allows + // us to enter into the same task queue as all the message callbacks, and makes it so everything occurs in the + // right order. + setTimeout(() => { + emitter.emit(ReceiverEvents.message, { + message: { + body: "the first message", + message_annotations: { + [Constants.enqueuedTime]: 0 + } + } as RheaMessage + } as EventContext); + }); + + emitter.emit(ReceiverEvents.receiverDrained, {} as EventContext); + + const results = await receiveMessagesPromise; + + assert.equal(1, results.length); + }); }); diff --git a/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts b/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts index f9f35502a405..42ea82146fa9 100644 --- a/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts @@ -122,7 +122,7 @@ describe("Message session unit tests", () => { } ); - const { receiveIsReady, emitter } = setupFakeReceiver(receiver); + const { receiveIsReady, emitter } = setupFakeReceiver(receiver, clock); const receivePromise = receiver.receiveMessages(3, bigTimeout, littleTimeout, {}); await receiveIsReady; @@ -218,7 +218,7 @@ describe("Message session unit tests", () => { } ); - const { receiveIsReady, emitter } = setupFakeReceiver(receiver); + const { receiveIsReady, emitter } = setupFakeReceiver(receiver, clock); let wasCalled = false; @@ -264,7 +264,8 @@ describe("Message session unit tests", () => { }); function setupFakeReceiver( - batchingReceiver: MessageSession + batchingReceiver: MessageSession, + clock?: ReturnType ): { receiveIsReady: Promise; emitter: EventEmitter; @@ -314,6 +315,7 @@ describe("Message session unit tests", () => { if (_credits === 1 && fakeRheaReceiver.drain === true) { // special case - if we're draining we should initiate a drain emitter.emit(ReceiverEvents.receiverDrained, undefined); + clock?.runAll(); } else { credits += _credits; }