Skip to content

Commit

Permalink
[service-bus] Fix issue on drain where onDrain and onMessage migh…
Browse files Browse the repository at this point in the history
…t sequence incorrectly. (#12772)

Fixes an issue where drain could potentially resolve a promise even though some messages are pending processing.

This can happen if the user calls receiveMessages(5), for instance, and the timeout expires before they receive 
them all - when this happens we then 'drain' the excess credits by sending a request to Service Bus on the link. Now, let's say that before the drain _response_ comes in we receive some of the other messages (after all, we requested 5). Those messages are then, via setTimeout(emit, 0), scheduled to be sent to our onMessage handler. They are not dispatched to us immediately.

Now, when the drain response arrives (prior to this fix) the drain is processed immediately (unlike other messages the rhea-promise code does not call it with setTimeout), potentially bypassing any onMessage callbacks that are still waiting to be processed in the task queue.

By using setTimeout() here we use the same technique that the messages are scheduled by, ensuring that the resolve() is processed after any of those pending callbacks.

Fixes #12711 (potentially)
  • Loading branch information
richardpark-msft authored Dec 7, 2020
1 parent 1cc6607 commit 20bed38
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 9 deletions.
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
- Updates documentation for `ServiceBusMessage` to call out that the `body` field
must be converted to a byte array or `Buffer` when cross-language
compatibility while receiving events is required.
- Fix issue where receiveMessages might return fewer messages than were received, causing them to be potentially locked or lost.
[PR 12772](https://github.com/Azure/azure-sdk-for-js/pull/12772)

## 7.0.0 (2020-11-23)

Expand Down
8 changes: 7 additions & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,13 @@ export class BatchingReceiverLite {
`${loggingPrefix} Drained, resolving receiveMessages() with ${brokeredMessages.length} messages.`
);

resolve(brokeredMessages);
// NOTE: through rhea-promise most of our event handlers are made asynchronous by calling setTimeout(emit).
// However, a small set (*error and drain) execute immediately. This can lead to a situation where the logical
// ordering of events is correct but the execution order is incorrect because the events are not all getting
// put into the task queue the same way.
// So this call, while odd, just ensures that we resolve _after_ any already-queued onMessage handlers that may
// be waiting in the task queue.
setTimeout(() => resolve(brokeredMessages));
};

cleanupBeforeResolveOrReject = (
Expand Down
91 changes: 86 additions & 5 deletions sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { ConnectionContext } from "../../src/connectionContext";
import { ServiceBusReceiverImpl } from "../../src/receivers/receiver";
import { OperationOptionsBase } from "../../src/modelsToBeSharedWithEventHubs";
import { ReceiveMode } from "../../src/models";
import { Constants } from "@azure/core-amqp";

describe("BatchingReceiver unit tests", () => {
let closeables: { close(): Promise<void> }[];
Expand Down Expand Up @@ -267,7 +268,8 @@ describe("BatchingReceiver unit tests", () => {
closeables.push(receiver);

const { receiveIsReady, emitter, remainingRegisteredListeners } = setupBatchingReceiver(
receiver
receiver,
clock
);

const receivePromise = receiver.receive(3, bigTimeout, littleTimeout, {});
Expand Down Expand Up @@ -373,7 +375,8 @@ describe("BatchingReceiver unit tests", () => {
closeables.push(receiver);

const { receiveIsReady, emitter, remainingRegisteredListeners } = setupBatchingReceiver(
receiver
receiver,
clock
);

let wasCalled = false;
Expand Down Expand Up @@ -420,7 +423,8 @@ describe("BatchingReceiver unit tests", () => {
);

function setupBatchingReceiver(
batchingReceiver: BatchingReceiver
batchingReceiver: BatchingReceiver,
clock?: ReturnType<typeof sinon.useFakeTimers>
): {
receiveIsReady: Promise<void>;
emitter: EventEmitter;
Expand All @@ -431,7 +435,7 @@ describe("BatchingReceiver unit tests", () => {
emitter,
remainingRegisteredListeners,
receiveIsReady
} = createFakeReceiver();
} = createFakeReceiver(clock);

batchingReceiver["_link"] = fakeRheaReceiver;

Expand All @@ -450,7 +454,9 @@ describe("BatchingReceiver unit tests", () => {
});
});

function createFakeReceiver(): {
function createFakeReceiver(
clock?: ReturnType<typeof sinon.useFakeTimers>
): {
receiveIsReady: Promise<void>;
emitter: EventEmitter;
remainingRegisteredListeners: Set<string>;
Expand Down Expand Up @@ -500,6 +506,7 @@ describe("BatchingReceiver unit tests", () => {
if (_credits === 1 && fakeRheaReceiver.drain === true) {
// special case - if we're draining we should initiate a drain
emitter.emit(ReceiverEvents.receiverDrained, undefined);
clock?.runAll();
} else {
credits += _credits;
}
Expand Down Expand Up @@ -658,4 +665,78 @@ describe("BatchingReceiver unit tests", () => {
assert.isEmpty(results);
});
});

it("drain doesn't resolve before message callbacks have completed", async () => {
const { fakeRheaReceiver, emitter, receiveIsReady } = createFakeReceiver();

const receiver = new BatchingReceiverLite(
createConnectionContextForTests(),
"fakeEntityPath",
async () => {
return fakeRheaReceiver;
},
"peekLock"
);

const receiveMessagesPromise = receiver
.receiveMessages({
maxMessageCount: 3,
maxTimeAfterFirstMessageInMs: 5000,
maxWaitTimeInMs: 5000
})
.then((messages) => {
console.log(`===> then running, messages: ${messages.map((m) => m.body).join(", ")}`);
return [...messages];
});

await receiveIsReady;

// We've had an issue in the past where it seemed that drain was
// causing us to potentially not return messages that should have
// existed. In our tests this is very hard to reproduce because it
// requires us to control more of how the stream of events are
// returned in Service Bus.

// Our suspicion has been that it's possible we're receiving messages _after_ a
// drain has occurred but we've never seen it in the wild or in any of our testing.

// However, in rhea-promise there is an odd mismatch of dispatching that can cause this
// sequence of events to occur:
// 1. rhea-promise: receive message A
// rhea-promise then calls: setTimeout(emit(messageA))
// 2. us: decide to drain (timeout expired)
// 3. rhea-promise: sends drain
// 4. rhea-promise: receives message B (Service Bus has not yet processed the drain request)
// rhea-promise then calls: setTimeout(emit(messageB))
//
// Now at this point we have the setTimeout(emit(messageB)) in the task queue. It'll be
// executed at the next turn of the event loop.
//
// The problem then comes in when rhea-promise receives the receiver_drained event:
// 4. rhea-promise: receives receiver_drain event
// emit(drain) // note it does _not_ use setTimeout()
//
// This causes the drain event to fire immediately. When it resolves the underlying promise
// it resolves it prior to emit(messageB) firing, resulting in lost messages.
//
// To fix this when we get receive_drained we setTimeout(resolve) instead of just immediately resolving. This allows
// us to enter into the same task queue as all the message callbacks, and makes it so everything occurs in the
// right order.
setTimeout(() => {
emitter.emit(ReceiverEvents.message, {
message: {
body: "the first message",
message_annotations: {
[Constants.enqueuedTime]: 0
}
} as RheaMessage
} as EventContext);
});

emitter.emit(ReceiverEvents.receiverDrained, {} as EventContext);

const results = await receiveMessagesPromise;

assert.equal(1, results.length);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ describe("Message session unit tests", () => {
}
);

const { receiveIsReady, emitter } = setupFakeReceiver(receiver);
const { receiveIsReady, emitter } = setupFakeReceiver(receiver, clock);

const receivePromise = receiver.receiveMessages(3, bigTimeout, littleTimeout, {});
await receiveIsReady;
Expand Down Expand Up @@ -218,7 +218,7 @@ describe("Message session unit tests", () => {
}
);

const { receiveIsReady, emitter } = setupFakeReceiver(receiver);
const { receiveIsReady, emitter } = setupFakeReceiver(receiver, clock);

let wasCalled = false;

Expand Down Expand Up @@ -264,7 +264,8 @@ describe("Message session unit tests", () => {
});

function setupFakeReceiver(
batchingReceiver: MessageSession
batchingReceiver: MessageSession,
clock?: ReturnType<typeof sinon.useFakeTimers>
): {
receiveIsReady: Promise<void>;
emitter: EventEmitter;
Expand Down Expand Up @@ -314,6 +315,7 @@ describe("Message session unit tests", () => {
if (_credits === 1 && fakeRheaReceiver.drain === true) {
// special case - if we're draining we should initiate a drain
emitter.emit(ReceiverEvents.receiverDrained, undefined);
clock?.runAll();
} else {
credits += _credits;
}
Expand Down

0 comments on commit 20bed38

Please sign in to comment.