diff --git a/sdk/eventhub/event-hubs/CHANGELOG.md b/sdk/eventhub/event-hubs/CHANGELOG.md index 080b5ed54afd..a7892ce6d77e 100644 --- a/sdk/eventhub/event-hubs/CHANGELOG.md +++ b/sdk/eventhub/event-hubs/CHANGELOG.md @@ -2,6 +2,12 @@ ## 5.2.1 (Unreleased) +- Fixes issue [#8584](https://github.com/Azure/azure-sdk-for-js/issues/8584) + where attempting to create AMQP links when the AMQP connection was in the + process of closing resulted in a `TypeError` in an uncaught exception. + ([PR 8884](https://github.com/Azure/azure-sdk-for-js/pull/8884)) +- Fixes reconnection issues by creating a new connection object rather than re-using the existing one. ([PR 8884](https://github.com/Azure/azure-sdk-for-js/pull/8884)) + ### Tracing updates: Tracing functionality is still in preview status and the APIs may have breaking diff --git a/sdk/eventhub/event-hubs/package.json b/sdk/eventhub/event-hubs/package.json index d3a3f4aaa8bf..4bd1b31696a1 100644 --- a/sdk/eventhub/event-hubs/package.json +++ b/sdk/eventhub/event-hubs/package.json @@ -81,7 +81,7 @@ }, "dependencies": { "@azure/abort-controller": "^1.0.0", - "@azure/core-amqp": "^1.1.1", + "@azure/core-amqp": "^1.1.3", "@azure/core-asynciterator-polyfill": "^1.0.0", "@azure/core-tracing": "1.0.0-preview.8", "@azure/logger": "^1.0.0", diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index 12c7d98aa22b..6462361d8b8c 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -1,6 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +/* eslint-disable @azure/azure-sdk/ts-no-namespaces */ +/* eslint-disable no-inner-declarations */ + import { logger } from "./log"; import { getRuntimeInfo } from "./util/runtimeInfo"; import { packageJsonInfo } from "./util/constants"; @@ -16,7 +19,7 @@ import { } from "@azure/core-amqp"; import { ManagementClient, ManagementClientOptions } from "./managementClient"; import { EventHubClientOptions } from "./models/public"; -import { ConnectionEvents, Dictionary, EventContext, OnAmqpEvent } from "rhea-promise"; +import { Connection, ConnectionEvents, Dictionary, EventContext, OnAmqpEvent } from "rhea-promise"; /** * @internal @@ -48,6 +51,41 @@ export interface ConnectionContext extends ConnectionContextBase { * the underlying amqp connection for the EventHub Client. */ managementSession?: ManagementClient; + /** + * Function returning a promise that resolves once the connectionContext is ready to open an AMQP link. + * ConnectionContext will be ready to open an AMQP link when: + * - The AMQP connection is already open on both sides. + * - The AMQP connection has been closed or disconnected. In this case, a new AMQP connection is expected + * to be created first. + * An AMQP link cannot be opened if the AMQP connection + * is in the process of closing or disconnecting. + */ + readyToOpenLink(): Promise; +} + +/** + * Describes the members on the ConnectionContext that are only + * used by it internally. + * @ignore + * @internal + */ +export interface ConnectionContextInternalMembers extends ConnectionContext { + /** + * Indicates whether the connection is in the process of closing. + * When this returns `true`, a `disconnected` event will be received + * after the connection is closed. + * + */ + isConnectionClosing(): boolean; + /** + * Resolves once the context's connection emits a `disconnected` event. + */ + waitForDisconnectedEvent(): Promise; + /** + * Resolves once the connection has finished being reset. + * Connections are reset as part of reacting to a `disconnected` event. + */ + waitForConnectionReset(): Promise; } /** @@ -59,6 +97,26 @@ export interface ConnectionContextOptions extends EventHubClientOptions { managementSessionAudience?: string; } +/** + * Helper type to get the names of all the functions on an object. + */ +type FunctionPropertyNames = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T]; +/** + * Helper type to get the types of all the functions on an object. + */ +type FunctionProperties = Pick>; +/** + * Helper type to get the types of all the functions on ConnectionContext + * and the internal methods from ConnectionContextInternalMembers. + * Note that this excludes the functions that ConnectionContext inherits. + * Each function also has its `this` type set as `ConnectionContext`. + */ +type ConnectionContextMethods = Omit< + FunctionProperties, + FunctionPropertyNames +> & + ThisType; + /** * @internal * @ignore @@ -118,6 +176,47 @@ export namespace ConnectionContext { }; connectionContext.managementSession = new ManagementClient(connectionContext, mOptions); + let waitForDisconnectResolve: () => void; + let waitForDisconnectPromise: Promise | undefined; + + Object.assign(connectionContext, { + isConnectionClosing() { + // When the connection is not open, but the remote end is open, + // then the rhea connection is in the process of terminating. + return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen()); + }, + async readyToOpenLink() { + // Check that the connection isn't in the process of closing. + // This can happen when the idle timeout has been reached but + // the underlying socket is waiting to be destroyed. + if (this.isConnectionClosing()) { + // Wait for the disconnected event that indicates the underlying socket has closed. + await this.waitForDisconnectedEvent(); + } + // Check if the connection is currently in the process of disconnecting. + if (waitForDisconnectPromise) { + // Wait for the connection to be reset. + await this.waitForConnectionReset(); + } + }, + waitForDisconnectedEvent() { + return new Promise((resolve) => { + logger.verbose( + `[${this.connectionId}] Attempting to reinitialize connection` + + ` but the connection is in the process of closing.` + + ` Waiting for the disconnect event before continuing.` + ); + this.connection.once(ConnectionEvents.disconnected, resolve); + }); + }, + waitForConnectionReset() { + if (waitForDisconnectPromise) { + return waitForDisconnectPromise; + } + return Promise.resolve(); + } + }); + // Define listeners to be added to the connection object for // "connection_open" and "connection_error" events. const onConnectionOpen: OnAmqpEvent = () => { @@ -129,7 +228,14 @@ export namespace ConnectionContext { ); }; - const disconnected: OnAmqpEvent = async (context: EventContext) => { + const onDisconnected: OnAmqpEvent = async (context: EventContext) => { + if (waitForDisconnectPromise) { + return; + } + waitForDisconnectPromise = new Promise((resolve) => { + waitForDisconnectResolve = resolve; + }); + logger.verbose( "[%s] 'disconnected' event occurred on the amqp connection.", connectionContext.connection.id @@ -169,39 +275,33 @@ export namespace ConnectionContext { connectionContext.connection.removeAllSessions(); // Close the cbs session to ensure all the event handlers are released. - await connectionContext.cbsSession.close(); + await connectionContext.cbsSession.close().catch(() => { + /* error already logged, swallow it here */ + }); // Close the management session to ensure all the event handlers are released. - await connectionContext.managementSession!.close(); + await connectionContext.managementSession!.close().catch(() => { + /* error already logged, swallow it here */ + }); // Close all senders and receivers to ensure clean up of timers & other resources. if (state.numSenders || state.numReceivers) { for (const senderName of Object.keys(connectionContext.senders)) { const sender = connectionContext.senders[senderName]; - if (!sender.isConnecting) { - await sender.close().catch((err) => { - logger.verbose( - "[%s] Error when closing sender [%s] after disconnected event: %O", - connectionContext.connection.id, - senderName, - err - ); - }); - } + await sender.close().catch(() => { + /* error already logged, swallow it here */ + }); } for (const receiverName of Object.keys(connectionContext.receivers)) { const receiver = connectionContext.receivers[receiverName]; - if (!receiver.isConnecting) { - await receiver.close().catch((err) => { - logger.verbose( - "[%s] Error when closing sender [%s] after disconnected event: %O", - connectionContext.connection.id, - receiverName, - err - ); - }); - } + await receiver.close().catch(() => { + /* error already logged, swallow it here */ + }); } } + + await refreshConnection(connectionContext); + waitForDisconnectResolve(); + waitForDisconnectPromise = undefined; }; const protocolError: OnAmqpEvent = async (context: EventContext) => { @@ -248,11 +348,47 @@ export namespace ConnectionContext { } }; - // Add listeners on the connection object. - connectionContext.connection.on(ConnectionEvents.connectionOpen, onConnectionOpen); - connectionContext.connection.on(ConnectionEvents.disconnected, disconnected); - connectionContext.connection.on(ConnectionEvents.protocolError, protocolError); - connectionContext.connection.on(ConnectionEvents.error, error); + function addConnectionListeners(connection: Connection) { + // Add listeners on the connection object. + connection.on(ConnectionEvents.connectionOpen, onConnectionOpen); + connection.on(ConnectionEvents.disconnected, onDisconnected); + connection.on(ConnectionEvents.protocolError, protocolError); + connection.on(ConnectionEvents.error, error); + } + + function cleanConnectionContext(connectionContext: ConnectionContext) { + // Remove listeners from the connection object. + connectionContext.connection.removeListener( + ConnectionEvents.connectionOpen, + onConnectionOpen + ); + connectionContext.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); + connectionContext.connection.removeListener(ConnectionEvents.protocolError, protocolError); + connectionContext.connection.removeListener(ConnectionEvents.error, error); + // Close the connection + return connectionContext.connection.close(); + } + + async function refreshConnection(connectionContext: ConnectionContext) { + const originalConnectionId = connectionContext.connectionId; + try { + await cleanConnectionContext(connectionContext); + } catch (err) { + logger.verbose( + `[${connectionContext.connectionId}] There was an error closing the connection before reconnecting: %O`, + err + ); + } + + // Create a new connection, id, locks, and cbs client. + connectionContext.refreshConnection(); + addConnectionListeners(connectionContext.connection); + logger.verbose( + `The connection "${originalConnectionId}" has been updated to "${connectionContext.connectionId}".` + ); + } + + addConnectionListeners(connectionContext.connection); logger.verbose("[%s] Created connection context successfully.", connectionContext.connectionId); return connectionContext; diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index 47d3f5feb8b2..1a48c5b859bd 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -358,15 +358,22 @@ export class EventHubReceiver extends LinkEntity { * @returns */ async close(): Promise { - this.clearHandlers(); + try { + this.clearHandlers(); - if (!this._receiver) { - return; - } + if (!this._receiver) { + return; + } - const receiverLink = this._receiver; - this._deleteFromCache(); - await this._closeLink(receiverLink); + const receiverLink = this._receiver; + this._deleteFromCache(); + await this._closeLink(receiverLink); + } catch (err) { + const msg = `[${this._context.connectionId}] An error occurred while closing receiver ${this.name}: ${err}`; + logger.warning(msg); + logErrorStackTrace(err); + throw err; + } } /** @@ -511,6 +518,9 @@ export class EventHubReceiver extends LinkEntity { try { if (!this.isOpen() && !this.isConnecting) { this.isConnecting = true; + + // Wait for the connectionContext to be ready to open the link. + await this._context.readyToOpenLink(); await this._negotiateClaim(); const receiverOptions: CreateReceiverOptions = { diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index b5ef8b64fdaf..d7a7337627ee 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -171,15 +171,22 @@ export class EventHubSender extends LinkEntity { * @returns Promise */ async close(): Promise { - if (this._sender) { - logger.info( - "[%s] Closing the Sender for the entity '%s'.", - this._context.connectionId, - this._context.config.entityPath - ); - const senderLink = this._sender; - this._deleteFromCache(); - await this._closeLink(senderLink); + try { + if (this._sender) { + logger.info( + "[%s] Closing the Sender for the entity '%s'.", + this._context.connectionId, + this._context.config.entityPath + ); + const senderLink = this._sender; + this._deleteFromCache(); + await this._closeLink(senderLink); + } + } catch (err) { + const msg = `[${this._context.connectionId}] An error occurred while closing sender ${this.name}: ${err}`; + logger.warning(msg); + logErrorStackTrace(err); + throw err; } } @@ -561,6 +568,9 @@ export class EventHubSender extends LinkEntity { try { if (!this.isOpen() && !this.isConnecting) { this.isConnecting = true; + + // Wait for the connectionContext to be ready to open the link. + await this._context.readyToOpenLink(); await this._negotiateClaim(); logger.verbose( diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index 6591f4366287..bad5a4a7f599 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -255,10 +255,12 @@ export class ManagementClient extends LinkEntity { */ async close(): Promise { try { + // Always clear the timeout, as the isOpen check may report + // false without ever having cleared the timeout otherwise. + clearTimeout(this._tokenRenewalTimer as NodeJS.Timer); if (this._isMgmtRequestResponseLinkOpen()) { const mgmtLink = this._mgmtReqResLink; this._mgmtReqResLink = undefined; - clearTimeout(this._tokenRenewalTimer as NodeJS.Timer); await mgmtLink!.close(); logger.info("Successfully closed the management session."); } @@ -273,6 +275,8 @@ export class ManagementClient extends LinkEntity { private async _init(): Promise { try { if (!this._isMgmtRequestResponseLinkOpen()) { + // Wait for the connectionContext to be ready to open the link. + await this._context.readyToOpenLink(); await this._negotiateClaim(); const rxopt: ReceiverOptions = { source: { address: this.address }, @@ -289,7 +293,9 @@ export class ManagementClient extends LinkEntity { ); } }; - const sropt: SenderOptions = { target: { address: this.address } }; + const sropt: SenderOptions = { + target: { address: this.address } + }; logger.verbose( "[%s] Creating sender/receiver links on a session for $management endpoint with " + "srOpts: %o, receiverOpts: %O.", diff --git a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts new file mode 100644 index 000000000000..5e6edb94f0b6 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts @@ -0,0 +1,134 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +const should = chai.should(); +import chaiAsPromised from "chai-as-promised"; +chai.use(chaiAsPromised); +import { EnvVarKeys, getEnvVars } from "../utils/testUtils"; +import { EventHubClient } from "../../src/impl/eventHubClient"; +import { EventHubConsumerClient, EventHubProducerClient } from "../../src/index"; +const env = getEnvVars(); + +describe("disconnected", function() { + let client: EventHubClient; + const service = { + connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + path: env[EnvVarKeys.EVENTHUB_NAME] + }; + before("validate environment", function(): void { + should.exist( + env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + "define EVENTHUB_CONNECTION_STRING in your environment before running integration tests." + ); + should.exist( + env[EnvVarKeys.EVENTHUB_NAME], + "define EVENTHUB_NAME in your environment before running integration tests." + ); + }); + + afterEach("close the connection", async function(): Promise { + if (client) { + await client.close(); + } + }); + + describe("HubRuntime", function() { + it("operations work after disconnect", async () => { + client = new EventHubClient(service.connectionString, service.path); + const clientConnectionContext = client["_context"]; + + await client.getPartitionIds({}); + const originalConnectionId = clientConnectionContext.connectionId; + + // Trigger a disconnect on the underlying connection. + clientConnectionContext.connection["_connection"].idle(); + + const partitionIds = await client.getPartitionIds({}); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + partitionIds.length.should.greaterThan(0, "Invalid number of partition ids returned."); + }); + }); + + describe("Receiver", function() { + it("should receive after a disconnect", async () => { + client = new EventHubClient(service.connectionString, service.path); + const receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, "0", { + sequenceNumber: 0 + }); + const clientConnectionContext = receiver["_context"]; + + await receiver.receiveBatch(1, 1); + const originalConnectionId = clientConnectionContext.connectionId; + + // Trigger a disconnect on the underlying connection. + clientConnectionContext.connection["_connection"].idle(); + + await receiver.receiveBatch(1, 1); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + + await receiver.close(); + }); + }); + + describe("Sender", function() { + it("should send after a disconnect", async () => { + const client = new EventHubProducerClient(service.connectionString, service.path); + const clientConnectionContext = client["_client"]["_context"]; + + await client.sendBatch([{ body: "test" }]); + const originalConnectionId = clientConnectionContext.connectionId; + + // Trigger a disconnect on the underlying connection. + clientConnectionContext.connection["_connection"].idle(); + + await client.sendBatch([{ body: "test2" }]); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + + await client.close(); + }); + + it("should not throw an uncaught exception", async () => { + const client = new EventHubProducerClient(service.connectionString, service.path); + const clientConnectionContext = client["_client"]["_context"]; + + // Send an event to open the connection. + await client.sendBatch([{ body: "test" }]); + const originalConnectionId = clientConnectionContext.connectionId; + + // We need to dig deep into the internals to get the awaitable sender so that . + const awaitableSender = client["_producersMap"].get("")!["_eventHubSender"]!["_sender"]!; + + let thirdSend: Promise; + // Change the timeout on the awaitableSender so it forces an OperationTimeoutError + awaitableSender.sendTimeoutInSeconds = 0; + // Ensure that the connection will disconnect, and another sendBatch occurs while a sendBatch is in-flight. + setTimeout(() => { + // Trigger a disconnect on the underlying connection while the `sendBatch` is in flight. + clientConnectionContext.connection["_connection"].idle(); + // Triggering another sendBatch immediately after an idle + // used to cause the rhea connection remote state to be cleared. + // This caused the in-flight sendBatch to throw an uncaught error + // if it timed out. + thirdSend = client.sendBatch([{ body: "test3" }]); + }, 0); + + await client.sendBatch([{ body: "test2" }]); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + + // ensure the sendBatch from the setTimeout succeeded. + // Wait for the connectionContext to be ready for opening. + await thirdSend!; + + await client.close(); + }); + }); +}); diff --git a/sdk/eventhub/event-hubs/test/receiver.spec.ts b/sdk/eventhub/event-hubs/test/receiver.spec.ts index 150d684cac25..a3b8fb589999 100644 --- a/sdk/eventhub/event-hubs/test/receiver.spec.ts +++ b/sdk/eventhub/event-hubs/test/receiver.spec.ts @@ -261,13 +261,13 @@ describe("EventHub Receiver", function(): void { describe("in streaming mode", function(): void { it("should receive messages correctly", async function(): Promise { const partitionId = partitionIds[0]; - const time = Date.now(); + const pInfo = await client.getPartitionProperties(partitionId); // send a message that can be received await producerClient.sendBatch([{ body: "receive behaves correctly" }], { partitionId }); receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - enqueuedOn: time + sequenceNumber: pInfo.lastEnqueuedSequenceNumber }); const received: ReceivedEventData[] = await new Promise((resolve, reject) => { diff --git a/sdk/servicebus/service-bus/package.json b/sdk/servicebus/service-bus/package.json index 939bcc83cbb4..267e804bccfd 100644 --- a/sdk/servicebus/service-bus/package.json +++ b/sdk/servicebus/service-bus/package.json @@ -86,7 +86,7 @@ }, "dependencies": { "@azure/abort-controller": "^1.0.0", - "@azure/core-amqp": "^1.1.1", + "@azure/core-amqp": "^1.1.3", "@azure/core-asynciterator-polyfill": "^1.0.0", "@azure/core-http": "^1.1.1", "@azure/core-tracing": "1.0.0-preview.8",