From 20bed382cfebc9351a2aed4a35a0b68fa7c0a393 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Mon, 7 Dec 2020 12:14:58 -0800 Subject: [PATCH] [service-bus] Fix issue on drain where `onDrain` and `onMessage` might sequence incorrectly. (#12772) Fixes an issue where drain could potentially resolve a promise even though some messages are pending processing. This can happen if the user calls receiveMessages(5), for instance, and the timeout expires before they receive them all - when this happens we then 'drain' the excess credits by sending a request to Service Bus on the link. Now, let's say that before the drain _response_ comes in we receive some of the other messages (after all, we requested 5). Those messages are then, via setTimeout(emit, 0), scheduled to be sent to our onMessage handler. They are not dispatched to us immediately. Now, when the drain response arrives (prior to this fix) the drain is processed immediately (unlike other messages the rhea-promise code does not call it with setTimeout), potentially bypassing any onMessage callbacks that are still waiting to be processed in the task queue. By using setTimeout() here we use the same technique that the messages are scheduled by, ensuring that the resolve() is processed after any of those pending callbacks. Fixes #12711 (potentially) --- sdk/servicebus/service-bus/CHANGELOG.md | 2 + .../service-bus/src/core/batchingReceiver.ts | 8 +- .../test/internal/batchingReceiver.spec.ts | 91 ++++++++++++++++++- .../test/internal/messageSession.spec.ts | 8 +- 4 files changed, 100 insertions(+), 9 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 7298a1c532ef..2e03e5fe4b1d 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -5,6 +5,8 @@ - Updates documentation for `ServiceBusMessage` to call out that the `body` field must be converted to a byte array or `Buffer` when cross-language compatibility while receiving events is required. +- Fix issue where receiveMessages might return fewer messages than were received, causing them to be potentially locked or lost. + [PR 12772](https://github.com/Azure/azure-sdk-for-js/pull/12772) ## 7.0.0 (2020-11-23) diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index a1245921d8b0..d465bf673a49 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -458,7 +458,13 @@ export class BatchingReceiverLite { `${loggingPrefix} Drained, resolving receiveMessages() with ${brokeredMessages.length} messages.` ); - resolve(brokeredMessages); + // NOTE: through rhea-promise most of our event handlers are made asynchronous by calling setTimeout(emit). + // However, a small set (*error and drain) execute immediately. This can lead to a situation where the logical + // ordering of events is correct but the execution order is incorrect because the events are not all getting + // put into the task queue the same way. + // So this call, while odd, just ensures that we resolve _after_ any already-queued onMessage handlers that may + // be waiting in the task queue. + setTimeout(() => resolve(brokeredMessages)); }; cleanupBeforeResolveOrReject = ( diff --git a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts index d6f9bf59f11d..73aaddf9bdff 100644 --- a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts @@ -32,6 +32,7 @@ import { ConnectionContext } from "../../src/connectionContext"; import { ServiceBusReceiverImpl } from "../../src/receivers/receiver"; import { OperationOptionsBase } from "../../src/modelsToBeSharedWithEventHubs"; import { ReceiveMode } from "../../src/models"; +import { Constants } from "@azure/core-amqp"; describe("BatchingReceiver unit tests", () => { let closeables: { close(): Promise }[]; @@ -267,7 +268,8 @@ describe("BatchingReceiver unit tests", () => { closeables.push(receiver); const { receiveIsReady, emitter, remainingRegisteredListeners } = setupBatchingReceiver( - receiver + receiver, + clock ); const receivePromise = receiver.receive(3, bigTimeout, littleTimeout, {}); @@ -373,7 +375,8 @@ describe("BatchingReceiver unit tests", () => { closeables.push(receiver); const { receiveIsReady, emitter, remainingRegisteredListeners } = setupBatchingReceiver( - receiver + receiver, + clock ); let wasCalled = false; @@ -420,7 +423,8 @@ describe("BatchingReceiver unit tests", () => { ); function setupBatchingReceiver( - batchingReceiver: BatchingReceiver + batchingReceiver: BatchingReceiver, + clock?: ReturnType ): { receiveIsReady: Promise; emitter: EventEmitter; @@ -431,7 +435,7 @@ describe("BatchingReceiver unit tests", () => { emitter, remainingRegisteredListeners, receiveIsReady - } = createFakeReceiver(); + } = createFakeReceiver(clock); batchingReceiver["_link"] = fakeRheaReceiver; @@ -450,7 +454,9 @@ describe("BatchingReceiver unit tests", () => { }); }); - function createFakeReceiver(): { + function createFakeReceiver( + clock?: ReturnType + ): { receiveIsReady: Promise; emitter: EventEmitter; remainingRegisteredListeners: Set; @@ -500,6 +506,7 @@ describe("BatchingReceiver unit tests", () => { if (_credits === 1 && fakeRheaReceiver.drain === true) { // special case - if we're draining we should initiate a drain emitter.emit(ReceiverEvents.receiverDrained, undefined); + clock?.runAll(); } else { credits += _credits; } @@ -658,4 +665,78 @@ describe("BatchingReceiver unit tests", () => { assert.isEmpty(results); }); }); + + it("drain doesn't resolve before message callbacks have completed", async () => { + const { fakeRheaReceiver, emitter, receiveIsReady } = createFakeReceiver(); + + const receiver = new BatchingReceiverLite( + createConnectionContextForTests(), + "fakeEntityPath", + async () => { + return fakeRheaReceiver; + }, + "peekLock" + ); + + const receiveMessagesPromise = receiver + .receiveMessages({ + maxMessageCount: 3, + maxTimeAfterFirstMessageInMs: 5000, + maxWaitTimeInMs: 5000 + }) + .then((messages) => { + console.log(`===> then running, messages: ${messages.map((m) => m.body).join(", ")}`); + return [...messages]; + }); + + await receiveIsReady; + + // We've had an issue in the past where it seemed that drain was + // causing us to potentially not return messages that should have + // existed. In our tests this is very hard to reproduce because it + // requires us to control more of how the stream of events are + // returned in Service Bus. + + // Our suspicion has been that it's possible we're receiving messages _after_ a + // drain has occurred but we've never seen it in the wild or in any of our testing. + + // However, in rhea-promise there is an odd mismatch of dispatching that can cause this + // sequence of events to occur: + // 1. rhea-promise: receive message A + // rhea-promise then calls: setTimeout(emit(messageA)) + // 2. us: decide to drain (timeout expired) + // 3. rhea-promise: sends drain + // 4. rhea-promise: receives message B (Service Bus has not yet processed the drain request) + // rhea-promise then calls: setTimeout(emit(messageB)) + // + // Now at this point we have the setTimeout(emit(messageB)) in the task queue. It'll be + // executed at the next turn of the event loop. + // + // The problem then comes in when rhea-promise receives the receiver_drained event: + // 4. rhea-promise: receives receiver_drain event + // emit(drain) // note it does _not_ use setTimeout() + // + // This causes the drain event to fire immediately. When it resolves the underlying promise + // it resolves it prior to emit(messageB) firing, resulting in lost messages. + // + // To fix this when we get receive_drained we setTimeout(resolve) instead of just immediately resolving. This allows + // us to enter into the same task queue as all the message callbacks, and makes it so everything occurs in the + // right order. + setTimeout(() => { + emitter.emit(ReceiverEvents.message, { + message: { + body: "the first message", + message_annotations: { + [Constants.enqueuedTime]: 0 + } + } as RheaMessage + } as EventContext); + }); + + emitter.emit(ReceiverEvents.receiverDrained, {} as EventContext); + + const results = await receiveMessagesPromise; + + assert.equal(1, results.length); + }); }); diff --git a/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts b/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts index f9f35502a405..42ea82146fa9 100644 --- a/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts @@ -122,7 +122,7 @@ describe("Message session unit tests", () => { } ); - const { receiveIsReady, emitter } = setupFakeReceiver(receiver); + const { receiveIsReady, emitter } = setupFakeReceiver(receiver, clock); const receivePromise = receiver.receiveMessages(3, bigTimeout, littleTimeout, {}); await receiveIsReady; @@ -218,7 +218,7 @@ describe("Message session unit tests", () => { } ); - const { receiveIsReady, emitter } = setupFakeReceiver(receiver); + const { receiveIsReady, emitter } = setupFakeReceiver(receiver, clock); let wasCalled = false; @@ -264,7 +264,8 @@ describe("Message session unit tests", () => { }); function setupFakeReceiver( - batchingReceiver: MessageSession + batchingReceiver: MessageSession, + clock?: ReturnType ): { receiveIsReady: Promise; emitter: EventEmitter; @@ -314,6 +315,7 @@ describe("Message session unit tests", () => { if (_credits === 1 && fakeRheaReceiver.drain === true) { // special case - if we're draining we should initiate a drain emitter.emit(ReceiverEvents.receiverDrained, undefined); + clock?.runAll(); } else { credits += _credits; }