From a72e5e2d5a3d953c192fea66faf3eb34b53bc08f Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Wed, 6 Jan 2021 16:25:55 -0800 Subject: [PATCH] [service-bus] Making BatchingReceiver properly cleanup after an abort (#13073) Some more fixes related to the prior PRs around making BatchingReceiver behavior consistent: * When we are closed (without error) or if we're in receiveAndDelete mode, we use the setTimeout() to ensure that we run after all pending message callbacks have fired * We always cleanup our handlers before resolve/reject and it's made more clear if we're resolving immediately or resolving after pending message callbacks. I also did some small refactors to make unit testing a bit easier, which should help make it simpler to validate this code path in the future. Fixes #12922 --- sdk/servicebus/service-bus/CHANGELOG.md | 2 + .../service-bus/src/core/batchingReceiver.ts | 349 +++++++++--------- .../test/internal/batchingReceiver.spec.ts | 41 +- .../service-bus/test/internal/tracing.spec.ts | 6 +- 4 files changed, 214 insertions(+), 184 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 588bed883618..d35f4643c4cf 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -5,6 +5,8 @@ - Fix the `isNode` check to allow the package to be usable in Electron. [Bug 12983](https://github.com/Azure/azure-sdk-for-js/issues/12983) - Fix issue where receiveMessages might return fewer messages than were received, causing them to be potentially locked or lost. [PR 12772](https://github.com/Azure/azure-sdk-for-js/pull/12772) + [PR 12908](https://github.com/Azure/azure-sdk-for-js/pull/12908) + [PR 13073](https://github.com/Azure/azure-sdk-for-js/pull/13073) - Updates documentation for `ServiceBusMessage` to call out that the `body` field must be converted to a byte array or `Buffer` when cross-language compatibility while receiving events is required. diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index 4df99abb37a8..f3203ec0c78f 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -292,7 +292,9 @@ export class BatchingReceiverLite { return []; } - const messages = await this._receiveMessagesImpl(receiver, args); + const messages = await new Promise((resolve, reject) => + this._receiveMessagesImpl(receiver, args, resolve, reject) + ); this._createAndEndProcessingSpan(messages, this, this._connectionContext.config, args); return messages; } finally { @@ -315,8 +317,10 @@ export class BatchingReceiverLite { private _receiveMessagesImpl( receiver: MinimalReceiver, - args: ReceiveMessageArgs - ): Promise { + args: ReceiveMessageArgs, + origResolve: (messages: ServiceBusMessageImpl[]) => void, + origReject: (err: Error | AmqpError) => void + ): void { const getRemainingWaitTimeInMs = this._getRemainingWaitTimeInMsFn( args.maxWaitTimeInMs, args.maxTimeAfterFirstMessageInMs @@ -325,200 +329,205 @@ export class BatchingReceiverLite { const brokeredMessages: ServiceBusMessageImpl[] = []; const loggingPrefix = `[${receiver.connection.id}|r:${receiver.name}]`; - return new Promise((resolve, reject) => { - let totalWaitTimer: NodeJS.Timer | undefined; + let totalWaitTimer: NodeJS.Timer | undefined; - // eslint-disable-next-line prefer-const - let cleanupBeforeResolveOrReject: () => void; + // eslint-disable-next-line prefer-const + let cleanupBeforeResolveOrReject: () => void; - const onError: OnAmqpEvent = (context: EventContext) => { - cleanupBeforeResolveOrReject(); + const reject = (err: Error | AmqpError) => { + cleanupBeforeResolveOrReject(); + origReject(err); + }; - const eventType = context.session?.error != null ? "session_error" : "receiver_error"; - let error = context.session?.error || context.receiver?.error; - - if (error) { - error = translateServiceBusError(error); - logger.logError( - error, - `${loggingPrefix} '${eventType}' event occurred. Received an error` - ); - } else { - error = new ServiceBusError( - "An error occurred while receiving messages.", - "GeneralError" - ); - } - reject(error); - }; + const resolveImmediately = (result: ServiceBusMessageImpl[]) => { + cleanupBeforeResolveOrReject(); + origResolve(result); + }; - this._closeHandler = (error?: AmqpError | Error): void => { + const resolveAfterPendingMessageCallbacks = (result: ServiceBusMessageImpl[]) => { + // NOTE: through rhea-promise, most of our event handlers are made asynchronous by calling setTimeout(emit). + // However, a small set (*error and drain) execute immediately. This can lead to a situation where the logical + // ordering of events is correct but the execution order is incorrect because the events are not all getting + // put into the task queue the same way. + // setTimeout() ensures that we resolve _after_ any already-queued onMessage handlers that may + // be waiting in the task queue. + setTimeout(() => { cleanupBeforeResolveOrReject(); + origResolve(result); + }); + }; - if ( - // no error, just closing. Go ahead and return what we have. - error == null || - // Return the collected messages if in ReceiveAndDelete mode because otherwise they are lost forever - (this._receiveMode === "receiveAndDelete" && brokeredMessages.length) - ) { - logger.verbose( - `${loggingPrefix} Closing. Resolving with ${brokeredMessages.length} messages.` - ); - return resolve(brokeredMessages); - } - - reject(translateServiceBusError(error)); - }; - - let abortSignalCleanupFunction: (() => void) | undefined = undefined; - - // Final action to be performed after - // - maxMessageCount is reached or - // - maxWaitTime is passed or - // - newMessageWaitTimeoutInSeconds is passed since the last message was received - const finalAction = (): void => { - // Drain any pending credits. - if (receiver.isOpen() && receiver.credit > 0) { - logger.verbose(`${loggingPrefix} Draining leftover credits(${receiver.credit}).`); - - // Setting drain must be accompanied by a flow call (aliased to addCredit in this case). - receiver.drain = true; - receiver.addCredit(1); - } else { - cleanupBeforeResolveOrReject(); - - logger.verbose( - `${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.` - ); - resolve(brokeredMessages); - } - }; - - // Action to be performed on the "message" event. - const onReceiveMessage: OnAmqpEventAsPromise = async (context: EventContext) => { - // TODO: this appears to be aggravating a bug that we need to look into more deeply. - // The same timeout+drain sequence should work fine for receiveAndDelete but it appears - // to cause problems. - if (this._receiveMode === "peekLock") { - if (brokeredMessages.length === 0) { - // We'll now remove the old timer (which was the overall `maxWaitTimeMs` timer) - // and replace it with another timer that is a (probably) much shorter interval. - // - // This allows the user to get access to received messages earlier and also gives us - // a chance to have fewer messages internally that could get lost if the user's - // app crashes in receiveAndDelete mode. - if (totalWaitTimer) clearTimeout(totalWaitTimer); - const remainingWaitTimeInMs = getRemainingWaitTimeInMs(); - totalWaitTimer = setTimeout(() => { - logger.verbose( - `${loggingPrefix} Batching, waited for ${remainingWaitTimeInMs} milliseconds after receiving the first message.` - ); - finalAction(); - }, remainingWaitTimeInMs); - } - } + const onError: OnAmqpEvent = (context: EventContext) => { + const eventType = context.session?.error != null ? "session_error" : "receiver_error"; + let error = context.session?.error || context.receiver?.error; - try { - const data: ServiceBusMessageImpl = this._createServiceBusMessage(context); - if (brokeredMessages.length < args.maxMessageCount) { - brokeredMessages.push(data); - } - } catch (err) { - const errObj = err instanceof Error ? err : new Error(JSON.stringify(err)); - logger.logError( - err, - `${loggingPrefix} Received an error while converting AmqpMessage to ServiceBusMessage` - ); - reject(errObj); - } - if (brokeredMessages.length === args.maxMessageCount) { - finalAction(); - } - }; + if (error) { + error = translateServiceBusError(error); + logger.logError(error, `${loggingPrefix} '${eventType}' event occurred. Received an error`); + } else { + error = new ServiceBusError("An error occurred while receiving messages.", "GeneralError"); + } + reject(error); + }; - const onClose: OnAmqpEventAsPromise = async (context: EventContext) => { - const type = context.session?.error != null ? "session_closed" : "receiver_closed"; - const error = context.session?.error || context.receiver?.error; + this._closeHandler = (error?: AmqpError | Error): void => { + if ( + // no error, just closing. Go ahead and return what we have. + error == null || + // Return the collected messages if in ReceiveAndDelete mode because otherwise they are lost forever + (this._receiveMode === "receiveAndDelete" && brokeredMessages.length) + ) { + logger.verbose( + `${loggingPrefix} Closing. Resolving with ${brokeredMessages.length} messages.` + ); - if (error) { - logger.logError(error, `${loggingPrefix} '${type}' event occurred. The associated error`); - } - }; + return resolveAfterPendingMessageCallbacks(brokeredMessages); + } - // Action to be performed on the "receiver_drained" event. - const onReceiveDrain: OnAmqpEvent = () => { - receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); - receiver.drain = false; + reject(translateServiceBusError(error)); + }; + let abortSignalCleanupFunction: (() => void) | undefined = undefined; + + // Final action to be performed after + // - maxMessageCount is reached or + // - maxWaitTime is passed or + // - newMessageWaitTimeoutInSeconds is passed since the last message was received + const finalAction = (): void => { + // Drain any pending credits. + if (receiver.isOpen() && receiver.credit > 0) { + logger.verbose(`${loggingPrefix} Draining leftover credits(${receiver.credit}).`); + + // setting .drain and combining it with .addCredit results in (eventually) sending + // a drain request to Service Bus. When the drain completes rhea will call `onReceiveDrain` + // at which point we'll wrap everything up and resolve the promise. + receiver.drain = true; + receiver.addCredit(1); + } else { logger.verbose( - `${loggingPrefix} Drained, resolving receiveMessages() with ${brokeredMessages.length} messages.` + `${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.` ); - // NOTE: through rhea-promise most of our event handlers are made asynchronous by calling setTimeout(emit). - // However, a small set (*error and drain) execute immediately. This can lead to a situation where the logical - // ordering of events is correct but the execution order is incorrect because the events are not all getting - // put into the task queue the same way. - // So this call, while odd, just ensures that we resolve _after_ any already-queued onMessage handlers that may - // be waiting in the task queue. - setTimeout(() => { - cleanupBeforeResolveOrReject(); - resolve(brokeredMessages); - }); - }; - - cleanupBeforeResolveOrReject = (): void => { - if (receiver != null) { - receiver.removeListener(ReceiverEvents.receiverError, onError); - receiver.removeListener(ReceiverEvents.message, onReceiveMessage); - receiver.session.removeListener(SessionEvents.sessionError, onError); - receiver.removeListener(ReceiverEvents.receiverClose, onClose); - receiver.session.removeListener(SessionEvents.sessionClose, onClose); - receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); - } + // we can resolve immediately (ie, no setTimeout call) because we have no + // remaining messages (thus nothing to wait for) + resolveImmediately(brokeredMessages); + } + }; - if (totalWaitTimer) { - clearTimeout(totalWaitTimer); + // Action to be performed on the "message" event. + const onReceiveMessage: OnAmqpEventAsPromise = async (context: EventContext) => { + // TODO: this appears to be aggravating a bug that we need to look into more deeply. + // The same timeout+drain sequence should work fine for receiveAndDelete but it appears + // to cause problems. + if (this._receiveMode === "peekLock") { + if (brokeredMessages.length === 0) { + // We'll now remove the old timer (which was the overall `maxWaitTimeMs` timer) + // and replace it with another timer that is a (probably) much shorter interval. + // + // This allows the user to get access to received messages earlier and also gives us + // a chance to have fewer messages internally that could get lost if the user's + // app crashes. + if (totalWaitTimer) clearTimeout(totalWaitTimer); + const remainingWaitTimeInMs = getRemainingWaitTimeInMs(); + totalWaitTimer = setTimeout(() => { + logger.verbose( + `${loggingPrefix} Batching, waited for ${remainingWaitTimeInMs} milliseconds after receiving the first message.` + ); + finalAction(); + }, remainingWaitTimeInMs); } + } - if (abortSignalCleanupFunction) { - abortSignalCleanupFunction(); + try { + const data: ServiceBusMessageImpl = this._createServiceBusMessage(context); + if (brokeredMessages.length < args.maxMessageCount) { + brokeredMessages.push(data); } - abortSignalCleanupFunction = undefined; - }; + } catch (err) { + const errObj = err instanceof Error ? err : new Error(JSON.stringify(err)); + logger.logError( + err, + `${loggingPrefix} Received an error while converting AmqpMessage to ServiceBusMessage` + ); + reject(errObj); + } + if (brokeredMessages.length === args.maxMessageCount) { + finalAction(); + } + }; - abortSignalCleanupFunction = checkAndRegisterWithAbortSignal((err) => { - cleanupBeforeResolveOrReject(); - reject(err); - }, args.abortSignal); + const onClose: OnAmqpEventAsPromise = async (context: EventContext) => { + const type = context.session?.error != null ? "session_closed" : "receiver_closed"; + const error = context.session?.error || context.receiver?.error; + + if (error) { + logger.logError(error, `${loggingPrefix} '${type}' event occurred. The associated error`); + } + }; + + // Action to be performed on the "receiver_drained" event. + const onReceiveDrain: OnAmqpEvent = () => { + receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); + receiver.drain = false; logger.verbose( - `${loggingPrefix} Adding credit for receiving ${args.maxMessageCount} messages.` + `${loggingPrefix} Drained, resolving receiveMessages() with ${brokeredMessages.length} messages.` ); - // By adding credit here, we let the service know that at max we can handle `maxMessageCount` - // number of messages concurrently. We will return the user an array of messages that can - // be of size upto maxMessageCount. Then the user needs to accordingly dispose - // (complete/abandon/defer/deadletter) the messages from the array. - receiver.addCredit(args.maxMessageCount); + resolveAfterPendingMessageCallbacks(brokeredMessages); + }; + + cleanupBeforeResolveOrReject = (): void => { + if (receiver != null) { + receiver.removeListener(ReceiverEvents.receiverError, onError); + receiver.removeListener(ReceiverEvents.message, onReceiveMessage); + receiver.session.removeListener(SessionEvents.sessionError, onError); + receiver.removeListener(ReceiverEvents.receiverClose, onClose); + receiver.session.removeListener(SessionEvents.sessionClose, onClose); + receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); + } + + if (totalWaitTimer) { + clearTimeout(totalWaitTimer); + } + + if (abortSignalCleanupFunction) { + abortSignalCleanupFunction(); + } + abortSignalCleanupFunction = undefined; + }; + + abortSignalCleanupFunction = checkAndRegisterWithAbortSignal((err) => { + reject(err); + }, args.abortSignal); + + logger.verbose( + `${loggingPrefix} Adding credit for receiving ${args.maxMessageCount} messages.` + ); + // By adding credit here, we let the service know that at max we can handle `maxMessageCount` + // number of messages concurrently. We will return the user an array of messages that can + // be of size upto maxMessageCount. Then the user needs to accordingly dispose + // (complete/abandon/defer/deadletter) the messages from the array. + receiver.addCredit(args.maxMessageCount); + + logger.verbose( + `${loggingPrefix} Setting the wait timer for ${args.maxWaitTimeInMs} milliseconds.` + ); + + totalWaitTimer = setTimeout(() => { logger.verbose( - `${loggingPrefix} Setting the wait timer for ${args.maxWaitTimeInMs} milliseconds.` + `${loggingPrefix} Batching, waited for max wait time ${args.maxWaitTimeInMs} milliseconds.` ); + finalAction(); + }, args.maxWaitTimeInMs); - totalWaitTimer = setTimeout(() => { - logger.verbose( - `${loggingPrefix} Batching, waited for max wait time ${args.maxWaitTimeInMs} milliseconds.` - ); - finalAction(); - }, args.maxWaitTimeInMs); - - receiver.on(ReceiverEvents.message, onReceiveMessage); - receiver.on(ReceiverEvents.receiverError, onError); - receiver.on(ReceiverEvents.receiverClose, onClose); - receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain); + receiver.on(ReceiverEvents.message, onReceiveMessage); + receiver.on(ReceiverEvents.receiverError, onError); + receiver.on(ReceiverEvents.receiverClose, onClose); + receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain); - receiver.session.on(SessionEvents.sessionError, onError); - receiver.session.on(SessionEvents.sessionClose, onClose); - }); + receiver.session.on(SessionEvents.sessionError, onError); + receiver.session.on(SessionEvents.sessionClose, onClose); } } diff --git a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts index 73aaddf9bdff..3b88c70044fe 100644 --- a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts @@ -635,7 +635,7 @@ describe("BatchingReceiver unit tests", () => { }); it("batchingReceiverLite.close() (ie, no error) just shuts down the current operation with no error", async () => { - const { fakeRheaReceiver, receiveIsReady } = createFakeReceiver(); + const { fakeRheaReceiver } = createFakeReceiver(); const receiver = new BatchingReceiverLite( createConnectionContextForTests(), @@ -648,21 +648,40 @@ describe("BatchingReceiver unit tests", () => { assert.notExists(receiver["_closeHandler"]); - const receiveMessagesPromise = receiver.receiveMessages({ - maxMessageCount: 1, - maxTimeAfterFirstMessageInMs: 1, - maxWaitTimeInMs: 1 - }); + let resolveWasCalled = false; + let rejectWasCalled = false; + + receiver["_receiveMessagesImpl"]( + (await receiver["_getCurrentReceiver"]())!, + { + maxMessageCount: 1, + maxTimeAfterFirstMessageInMs: 1, + maxWaitTimeInMs: 1 + }, + () => { + resolveWasCalled = true; + }, + () => { + rejectWasCalled = true; + } + ); - await receiveIsReady; assert.exists(receiver["_closeHandler"]); + assert.isFalse(resolveWasCalled); + assert.isFalse(rejectWasCalled); + + receiver.close(); - await receiver.close(); + // these are still false because we used setTimeout() (and we're using sinon) + // so the clock is "frozen" + assert.isFalse(resolveWasCalled); + assert.isFalse(rejectWasCalled); - const results = await receiveMessagesPromise; + // now unfreeze it (without ticking time forward, just running whatever is eligible _now_) + clock.tick(0); - // TODO: let's have a few messages in here. - assert.isEmpty(results); + assert.isTrue(resolveWasCalled); + assert.isFalse(rejectWasCalled); }); }); diff --git a/sdk/servicebus/service-bus/test/internal/tracing.spec.ts b/sdk/servicebus/service-bus/test/internal/tracing.spec.ts index 16972adfda6c..640c3fa21691 100644 --- a/sdk/servicebus/service-bus/test/internal/tracing.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/tracing.spec.ts @@ -63,8 +63,8 @@ describe("Tracing tests", () => { br["_createAndEndProcessingSpan"] = createSpanStub; - br["_receiveMessagesImpl"] = async () => { - return ([ + br["_receiveMessagesImpl"] = (_receiver, _args, resolve, _reject) => { + resolve(([ { applicationProperties: { "Diagnostic-Id": "diagnostic id 1" @@ -75,7 +75,7 @@ describe("Tracing tests", () => { "Diagnostic-Id": "diagnostic id 2" } } - ] as any) as ServiceBusMessageImpl[]; + ] as any) as ServiceBusMessageImpl[]); }; await br.receiveMessages({