From aaaaca2fe8e95c550b7f5ab20b8974c251ff9b36 Mon Sep 17 00:00:00 2001 From: chradek Date: Tue, 12 May 2020 17:13:03 -0700 Subject: [PATCH 01/12] [event-hubs] fix uncaught TypeError --- sdk/eventhub/event-hubs/CHANGELOG.md | 3 ++ .../event-hubs/src/eventHubReceiver.ts | 17 +++++++++ sdk/eventhub/event-hubs/src/eventHubSender.ts | 17 +++++++++ .../event-hubs/src/util/connectionUtils.ts | 35 +++++++++++++++++++ 4 files changed, 72 insertions(+) create mode 100644 sdk/eventhub/event-hubs/src/util/connectionUtils.ts diff --git a/sdk/eventhub/event-hubs/CHANGELOG.md b/sdk/eventhub/event-hubs/CHANGELOG.md index 080b5ed54afd..3116a47366fe 100644 --- a/sdk/eventhub/event-hubs/CHANGELOG.md +++ b/sdk/eventhub/event-hubs/CHANGELOG.md @@ -2,6 +2,9 @@ ## 5.2.1 (Unreleased) +- Fixes issue [#8584](https://github.com/Azure/azure-sdk-for-js/issues/8584) + where a `TypeError` was sometimes thrown as an uncaught exception. + ### Tracing updates: Tracing functionality is still in preview status and the APIs may have breaking diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index 448aa96f3fca..df6d03a546a1 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -17,6 +17,7 @@ import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; import { EventPosition, getEventPositionFilter } from "./eventPosition"; import { AbortSignalLike, AbortError } from "@azure/abort-controller"; +import { isConnectionClosing, waitForConnectionDisconnected } from "./util/connectionUtils"; /** * @ignore @@ -511,6 +512,22 @@ export class EventHubReceiver extends LinkEntity { try { if (!this.isOpen() && !this.isConnecting) { this.isConnecting = true; + + // Check that the connection isn't in the process of closing. + // This can happen when the idle timeout has been reached but + // the underlying socket is waiting to be destroyed. + if (isConnectionClosing(this._context)) { + // Wait for the disconnected event that indicates the underlying socket has closed. + await waitForConnectionDisconnected(this._context); + // Now that the previous connection has disconnected, + // create a new rhea connection object by refreshing. + const originalConnectionId = this._context.connectionId; + await this._context.refreshConnection(); + logger.verbose( + `The connection "${originalConnectionId}" has been updated to "${this._context.connectionId}".` + ); + } + await this._negotiateClaim(); const receiverOptions: CreateReceiverOptions = { diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index c8fb9f62dd1b..a5c3a75e6908 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -31,6 +31,7 @@ import { SendOptions } from "./models/public"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; import { AbortSignalLike, AbortError } from "@azure/abort-controller"; import { EventDataBatch, isEventDataBatch } from "./eventDataBatch"; +import { isConnectionClosing, waitForConnectionDisconnected } from "./util/connectionUtils"; /** * Describes the EventHubSender that will send event data to EventHub. @@ -561,6 +562,22 @@ export class EventHubSender extends LinkEntity { try { if (!this.isOpen() && !this.isConnecting) { this.isConnecting = true; + + // Check that the connection isn't in the process of closing. + // This can happen when the idle timeout has been reached but + // the underlying socket is waiting to be destroyed. + if (isConnectionClosing(this._context)) { + // Wait for the disconnected event that indicates the underlying socket has closed. + await waitForConnectionDisconnected(this._context); + // Now that the previous connection has disconnected, + // create a new rhea connection object by refreshing. + const originalConnectionId = this._context.connectionId; + await this._context.refreshConnection(); + logger.verbose( + `The connection "${originalConnectionId}" has been updated to "${this._context.connectionId}".` + ); + } + await this._negotiateClaim(); logger.verbose( diff --git a/sdk/eventhub/event-hubs/src/util/connectionUtils.ts b/sdk/eventhub/event-hubs/src/util/connectionUtils.ts new file mode 100644 index 000000000000..066ff8cfa389 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/util/connectionUtils.ts @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { ConnectionEvents } from "rhea-promise"; +import { ConnectionContext } from "../connectionContext"; +import { logger } from "../log"; + +/** + * Checks if the connection on a ConnectionContext is closing. + * @internal + * @ignore + * @param context + */ +export function isConnectionClosing(context: ConnectionContext): boolean { + // When the connection is not open, but the remote end is open, + // then the rhea connection is in the process of terminating. + return Boolean(!context.connection.isOpen() && context.connection.isRemoteOpen()); +} + +/** + * Resolves once the context's connection emits a 'disconnected' event. + * @internal + * @ignore + * @param context + */ +export async function waitForConnectionDisconnected(context: ConnectionContext): Promise { + return new Promise((resolve) => { + logger.verbose( + `[${context.connectionId}] Attempting to reinitialize connection` + + ` but the connection is in the process of closing.` + + ` Waiting for the disconnect event before continuing.` + ); + context.connection.once(ConnectionEvents.disconnected, resolve); + }); +} From 599617b78bac0298b7ee66b169e71d50f44cfa97 Mon Sep 17 00:00:00 2001 From: chradek Date: Wed, 13 May 2020 22:22:21 -0700 Subject: [PATCH 02/12] [event-hubs] create new rhea connection on disconnected --- .../event-hubs/src/connectionContext.ts | 134 ++++++++++++++---- .../event-hubs/src/eventHubReceiver.ts | 14 +- sdk/eventhub/event-hubs/src/eventHubSender.ts | 14 +- .../event-hubs/src/managementClient.ts | 15 +- .../event-hubs/test/hubruntime.spec.ts | 18 +++ sdk/eventhub/event-hubs/test/receiver.spec.ts | 24 ++++ sdk/eventhub/event-hubs/test/sender.spec.ts | 52 +++++++ 7 files changed, 224 insertions(+), 47 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index 6fc44dd7f98d..84afb1e9ea5c 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -1,3 +1,5 @@ +/* eslint-disable @azure/azure-sdk/ts-no-namespaces */ +/* eslint-disable no-inner-declarations */ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. @@ -16,7 +18,7 @@ import { } from "@azure/core-amqp"; import { ManagementClient, ManagementClientOptions } from "./managementClient"; import { EventHubClientOptions } from "./models/public"; -import { Dictionary, OnAmqpEvent, EventContext, ConnectionEvents } from "rhea-promise"; +import { Dictionary, OnAmqpEvent, EventContext, ConnectionEvents, Connection } from "rhea-promise"; /** * @internal @@ -48,6 +50,11 @@ export interface ConnectionContext extends ConnectionContextBase { * the underlying amqp connection for the EventHub Client. */ managementSession?: ManagementClient; + + isConnectionClosing(): boolean; + + isDisconnecting: boolean; + waitForConnectionReset(): Promise; } /** @@ -117,6 +124,20 @@ export namespace ConnectionContext { audience: options.managementSessionAudience }; connectionContext.managementSession = new ManagementClient(connectionContext, mOptions); + connectionContext.isConnectionClosing = function() { + // When the connection is not open, but the remote end is open, + // then the rhea connection is in the process of terminating. + return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen()); + }; + connectionContext.isDisconnecting = false; + let waitForResolve: () => void; + let waitForPromise: Promise | undefined; + connectionContext.waitForConnectionReset = function() { + if (this.isDisconnecting && waitForPromise) { + return waitForPromise; + } + return Promise.resolve(); + }; // Define listeners to be added to the connection object for // "connection_open" and "connection_error" events. @@ -130,6 +151,14 @@ export namespace ConnectionContext { }; const disconnected: OnAmqpEvent = async (context: EventContext) => { + if (connectionContext.isDisconnecting) { + return; + } + connectionContext.isDisconnecting = true; + waitForPromise = new Promise((resolve) => { + waitForResolve = resolve; + }); + logger.verbose( "[%s] 'disconnected' event occurred on the amqp connection.", connectionContext.connection.id @@ -169,39 +198,52 @@ export namespace ConnectionContext { connectionContext.connection.removeAllSessions(); // Close the cbs session to ensure all the event handlers are released. - await connectionContext.cbsSession.close(); + await connectionContext.cbsSession.close().catch((err) => { + logger.verbose( + "[%s] Error when closing cbsSession after disconnected event: %O", + connectionContext.connection.id, + err + ); + }); // Close the management session to ensure all the event handlers are released. - await connectionContext.managementSession!.close(); + await connectionContext.managementSession!.close().catch((err) => { + logger.verbose( + "[%s] Error when closing managementSession after disconnected event: %O", + connectionContext.connection.id, + err + ); + }); // Close all senders and receivers to ensure clean up of timers & other resources. if (state.numSenders || state.numReceivers) { for (const senderName of Object.keys(connectionContext.senders)) { const sender = connectionContext.senders[senderName]; - if (!sender.isConnecting) { - await sender.close().catch((err) => { - logger.verbose( - "[%s] Error when closing sender [%s] after disconnected event: %O", - connectionContext.connection.id, - senderName, - err - ); - }); - } + await sender.close().catch((err) => { + logger.verbose( + "[%s] Error when closing sender [%s] after disconnected event: %O", + connectionContext.connection.id, + senderName, + err + ); + }); } for (const receiverName of Object.keys(connectionContext.receivers)) { const receiver = connectionContext.receivers[receiverName]; - if (!receiver.isConnecting) { - await receiver.close().catch((err) => { - logger.verbose( - "[%s] Error when closing sender [%s] after disconnected event: %O", - connectionContext.connection.id, - receiverName, - err - ); - }); - } + await receiver.close().catch((err) => { + logger.verbose( + "[%s] Error when closing sender [%s] after disconnected event: %O", + connectionContext.connection.id, + receiverName, + err + ); + }); } } + + await refreshConnection(connectionContext); + waitForResolve(); + waitForPromise = undefined; + connectionContext.isDisconnecting = false; }; const protocolError: OnAmqpEvent = async (context: EventContext) => { @@ -248,11 +290,47 @@ export namespace ConnectionContext { } }; - // Add listeners on the connection object. - connectionContext.connection.on(ConnectionEvents.connectionOpen, onConnectionOpen); - connectionContext.connection.on(ConnectionEvents.disconnected, disconnected); - connectionContext.connection.on(ConnectionEvents.protocolError, protocolError); - connectionContext.connection.on(ConnectionEvents.error, error); + function addConnectionListeners(connection: Connection) { + // Add listeners on the connection object. + connection.on(ConnectionEvents.connectionOpen, onConnectionOpen); + connection.on(ConnectionEvents.disconnected, disconnected); + connection.on(ConnectionEvents.protocolError, protocolError); + connection.on(ConnectionEvents.error, error); + } + + function cleanConnectionContext(connectionContext: ConnectionContext) { + // Remove listeners from the connection object. + connectionContext.connection.removeListener( + ConnectionEvents.connectionOpen, + onConnectionOpen + ); + connectionContext.connection.removeListener(ConnectionEvents.disconnected, disconnected); + connectionContext.connection.removeListener(ConnectionEvents.protocolError, protocolError); + connectionContext.connection.removeListener(ConnectionEvents.error, error); + // Close the connection + return connectionContext.connection.close(); + } + + async function refreshConnection(connectionContext: ConnectionContext) { + const originalConnectionId = connectionContext.connectionId; + try { + await cleanConnectionContext(connectionContext); + } catch (err) { + logger.verbose( + `[${connectionContext.connectionId}] There was an error closing the connection before reconnecting: %O`, + err + ); + } + + // Create a new connection, id, locks, and cbs client. + connectionContext.refreshConnection(); + addConnectionListeners(connectionContext.connection); + logger.verbose( + `The connection "${originalConnectionId}" has been updated to "${connectionContext.connectionId}".` + ); + } + + addConnectionListeners(connectionContext.connection); logger.verbose("[%s] Created connection context successfully.", connectionContext.connectionId); return connectionContext; diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index df6d03a546a1..94fbf62ac1ad 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -17,7 +17,7 @@ import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; import { EventPosition, getEventPositionFilter } from "./eventPosition"; import { AbortSignalLike, AbortError } from "@azure/abort-controller"; -import { isConnectionClosing, waitForConnectionDisconnected } from "./util/connectionUtils"; +import { waitForConnectionDisconnected } from "./util/connectionUtils"; /** * @ignore @@ -516,16 +516,12 @@ export class EventHubReceiver extends LinkEntity { // Check that the connection isn't in the process of closing. // This can happen when the idle timeout has been reached but // the underlying socket is waiting to be destroyed. - if (isConnectionClosing(this._context)) { + if (this._context.isConnectionClosing()) { // Wait for the disconnected event that indicates the underlying socket has closed. await waitForConnectionDisconnected(this._context); - // Now that the previous connection has disconnected, - // create a new rhea connection object by refreshing. - const originalConnectionId = this._context.connectionId; - await this._context.refreshConnection(); - logger.verbose( - `The connection "${originalConnectionId}" has been updated to "${this._context.connectionId}".` - ); + } + if (this._context.isDisconnecting) { + await this._context.waitForConnectionReset(); } await this._negotiateClaim(); diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index a5c3a75e6908..30d587bf1df1 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -31,7 +31,7 @@ import { SendOptions } from "./models/public"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; import { AbortSignalLike, AbortError } from "@azure/abort-controller"; import { EventDataBatch, isEventDataBatch } from "./eventDataBatch"; -import { isConnectionClosing, waitForConnectionDisconnected } from "./util/connectionUtils"; +import { waitForConnectionDisconnected } from "./util/connectionUtils"; /** * Describes the EventHubSender that will send event data to EventHub. @@ -566,16 +566,12 @@ export class EventHubSender extends LinkEntity { // Check that the connection isn't in the process of closing. // This can happen when the idle timeout has been reached but // the underlying socket is waiting to be destroyed. - if (isConnectionClosing(this._context)) { + if (this._context.isConnectionClosing()) { // Wait for the disconnected event that indicates the underlying socket has closed. await waitForConnectionDisconnected(this._context); - // Now that the previous connection has disconnected, - // create a new rhea connection object by refreshing. - const originalConnectionId = this._context.connectionId; - await this._context.refreshConnection(); - logger.verbose( - `The connection "${originalConnectionId}" has been updated to "${this._context.connectionId}".` - ); + } + if (this._context.isDisconnecting) { + await this._context.waitForConnectionReset(); } await this._negotiateClaim(); diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index a967b827ed37..7381ab755a55 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -28,6 +28,7 @@ import { LinkEntity } from "./linkEntity"; import { logger, logErrorStackTrace } from "./log"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; import { AbortSignalLike, AbortError } from "@azure/abort-controller"; +import { waitForConnectionDisconnected } from "./util/connectionUtils"; /** * Describes the runtime information of an Event Hub. */ @@ -255,10 +256,12 @@ export class ManagementClient extends LinkEntity { */ async close(): Promise { try { + // Always clear the timeout, as the isOpen check may report + // false without ever having cleared the timeout otherwise. + clearTimeout(this._tokenRenewalTimer as NodeJS.Timer); if (this._isMgmtRequestResponseLinkOpen()) { const mgmtLink = this._mgmtReqResLink; this._mgmtReqResLink = undefined; - clearTimeout(this._tokenRenewalTimer as NodeJS.Timer); await mgmtLink!.close(); logger.info("Successfully closed the management session."); } @@ -273,6 +276,16 @@ export class ManagementClient extends LinkEntity { private async _init(): Promise { try { if (!this._isMgmtRequestResponseLinkOpen()) { + // Check that the connection isn't in the process of closing. + // This can happen when the idle timeout has been reached but + // the underlying socket is waiting to be destroyed. + if (this._context.isConnectionClosing()) { + // Wait for the disconnected event that indicates the underlying socket has closed. + await waitForConnectionDisconnected(this._context); + } + if (this._context.isDisconnecting) { + await this._context.waitForConnectionReset(); + } await this._negotiateClaim(); const rxopt: ReceiverOptions = { source: { address: this.address }, diff --git a/sdk/eventhub/event-hubs/test/hubruntime.spec.ts b/sdk/eventhub/event-hubs/test/hubruntime.spec.ts index 14c6161e8ee9..2b7308506230 100644 --- a/sdk/eventhub/event-hubs/test/hubruntime.spec.ts +++ b/sdk/eventhub/event-hubs/test/hubruntime.spec.ts @@ -40,6 +40,24 @@ describe("RuntimeInformation", function(): void { }); } + describe("disconnected", function() { + it("should work after disconnect", async () => { + client = new EventHubClient(service.connectionString, service.path); + const clientConnectionContext = client["_context"]; + + await client.getPartitionIds({}); + const originalConnectionId = clientConnectionContext.connectionId; + + // Trigger a disconnect on the underlying connection. + clientConnectionContext.connection["_connection"].idle(); + + await client.getPartitionIds({}); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + }); + }); + describe("getPartitionIds", function(): void { it("returns an array of partition IDs", async function(): Promise { client = new EventHubClient(service.connectionString, service.path); diff --git a/sdk/eventhub/event-hubs/test/receiver.spec.ts b/sdk/eventhub/event-hubs/test/receiver.spec.ts index 4166eb5b6d37..465d3b142cf4 100644 --- a/sdk/eventhub/event-hubs/test/receiver.spec.ts +++ b/sdk/eventhub/event-hubs/test/receiver.spec.ts @@ -453,6 +453,30 @@ describe("EventHub Receiver", function(): void { }); describe("in batch mode", function(): void { + describe("disconnected", function() { + it("should receive after a disconnect", async () => { + const client = new EventHubClient(service.connectionString, service.path); + const receiver = client.createConsumer(EventHubClient.defaultConsumerGroupName, "0", { + sequenceNumber: 0 + }); + const clientConnectionContext = receiver["_context"]; + + await receiver.receiveBatch(1, 1); + const originalConnectionId = clientConnectionContext.connectionId; + + // Trigger a disconnect on the underlying connection. + clientConnectionContext.connection["_connection"].idle(); + + await receiver.receiveBatch(1, 1); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + + await receiver.close(); + await client.close(); + }); + }); + it("should receive messages correctly", async function(): Promise { const partitionId = partitionIds[0]; receiver = client.createConsumer( diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 768f8fff906a..555f3070cff4 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -1351,6 +1351,58 @@ describe("EventHub Sender", function(): void { }); }); + describe("disconnected", function(): void { + it("should send after a disconnect", async () => { + const client = new EventHubProducerClient(service.connectionString, service.path); + const clientConnectionContext = client["_client"]["_context"]; + + await client.sendBatch([{ body: "test" }]); + const originalConnectionId = clientConnectionContext.connectionId; + + // Trigger a disconnect on the underlying connection. + clientConnectionContext.connection["_connection"].idle(); + + await client.sendBatch([{ body: "test2" }]); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + + await client.close(); + }); + + it("should not throw an uncaught exception", async () => { + const client = new EventHubProducerClient(service.connectionString, service.path); + const clientConnectionContext = client["_client"]["_context"]; + + // Send an event to open the connection. + await client.sendBatch([{ body: "test" }]); + const originalConnectionId = clientConnectionContext.connectionId; + + // We need to dig deep into the internals to get the awaitable sender so that . + const awaitableSender = client["_producersMap"].get("")!["_eventHubSender"]!["_sender"]!; + + // Change the timeout on the awaitableSender so it forces an OperationTimeoutError + awaitableSender.sendTimeoutInSeconds = 0; + // Ensure that the connection will disconnect, and another sendBatch occurs while a sendBatch is in-flight. + setTimeout(() => { + // Trigger a disconnect on the underlying connection while the `sendBatch` is in flight. + clientConnectionContext.connection["_connection"].idle(); + // Triggering another sendBatch immediately after an idle + // used to cause the rhea connection remote state to be cleared. + // This caused the in-flight sendBatch to throw an uncaught error + // if it timed out. + client.sendBatch([{ body: "test3" }]); + }, 0); + + await client.sendBatch([{ body: "test2" }]); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + + await client.close(); + }); + }); + async function sendBatch( bodies: any[], partitionId: string, From a76d99272825b9d10b9f5f23a3fa4d0e4fa82070 Mon Sep 17 00:00:00 2001 From: chradek Date: Wed, 13 May 2020 22:46:09 -0700 Subject: [PATCH 03/12] make disconnect tests node only --- .../event-hubs/test/hubruntime.spec.ts | 18 --- .../event-hubs/test/node/disconnects.spec.ts | 128 ++++++++++++++++++ sdk/eventhub/event-hubs/test/receiver.spec.ts | 24 ---- sdk/eventhub/event-hubs/test/sender.spec.ts | 52 ------- 4 files changed, 128 insertions(+), 94 deletions(-) create mode 100644 sdk/eventhub/event-hubs/test/node/disconnects.spec.ts diff --git a/sdk/eventhub/event-hubs/test/hubruntime.spec.ts b/sdk/eventhub/event-hubs/test/hubruntime.spec.ts index 2b7308506230..14c6161e8ee9 100644 --- a/sdk/eventhub/event-hubs/test/hubruntime.spec.ts +++ b/sdk/eventhub/event-hubs/test/hubruntime.spec.ts @@ -40,24 +40,6 @@ describe("RuntimeInformation", function(): void { }); } - describe("disconnected", function() { - it("should work after disconnect", async () => { - client = new EventHubClient(service.connectionString, service.path); - const clientConnectionContext = client["_context"]; - - await client.getPartitionIds({}); - const originalConnectionId = clientConnectionContext.connectionId; - - // Trigger a disconnect on the underlying connection. - clientConnectionContext.connection["_connection"].idle(); - - await client.getPartitionIds({}); - const newConnectionId = clientConnectionContext.connectionId; - - should.not.equal(originalConnectionId, newConnectionId); - }); - }); - describe("getPartitionIds", function(): void { it("returns an array of partition IDs", async function(): Promise { client = new EventHubClient(service.connectionString, service.path); diff --git a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts new file mode 100644 index 000000000000..4dbf5c654e74 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts @@ -0,0 +1,128 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +const should = chai.should(); +import chaiAsPromised from "chai-as-promised"; +chai.use(chaiAsPromised); +import { EnvVarKeys, getEnvVars } from "../utils/testUtils"; +import { EventHubClient } from "../../src/impl/eventHubClient"; +import { EventHubProducerClient } from "../../src/index"; +const env = getEnvVars(); + +describe("disconnected", function() { + let client: EventHubClient; + const service = { + connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + path: env[EnvVarKeys.EVENTHUB_NAME] + }; + before("validate environment", function(): void { + should.exist( + env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + "define EVENTHUB_CONNECTION_STRING in your environment before running integration tests." + ); + should.exist( + env[EnvVarKeys.EVENTHUB_NAME], + "define EVENTHUB_NAME in your environment before running integration tests." + ); + }); + + afterEach("close the connection", async function(): Promise { + if (client) { + await client.close(); + } + }); + + describe("HubRuntime", function() { + it("operations work after disconnect", async () => { + client = new EventHubClient(service.connectionString, service.path); + const clientConnectionContext = client["_context"]; + + await client.getPartitionIds({}); + const originalConnectionId = clientConnectionContext.connectionId; + + // Trigger a disconnect on the underlying connection. + clientConnectionContext.connection["_connection"].idle(); + + await client.getPartitionIds({}); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + }); + }); + + describe("Receiver", function() { + it("should receive after a disconnect", async () => { + client = new EventHubClient(service.connectionString, service.path); + const receiver = client.createConsumer(EventHubClient.defaultConsumerGroupName, "0", { + sequenceNumber: 0 + }); + const clientConnectionContext = receiver["_context"]; + + await receiver.receiveBatch(1, 1); + const originalConnectionId = clientConnectionContext.connectionId; + + // Trigger a disconnect on the underlying connection. + clientConnectionContext.connection["_connection"].idle(); + + await receiver.receiveBatch(1, 1); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + + await receiver.close(); + }); + }); + + describe("Sender", function() { + it("should send after a disconnect", async () => { + const client = new EventHubProducerClient(service.connectionString, service.path); + const clientConnectionContext = client["_client"]["_context"]; + + await client.sendBatch([{ body: "test" }]); + const originalConnectionId = clientConnectionContext.connectionId; + + // Trigger a disconnect on the underlying connection. + clientConnectionContext.connection["_connection"].idle(); + + await client.sendBatch([{ body: "test2" }]); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + + await client.close(); + }); + + it("should not throw an uncaught exception", async () => { + const client = new EventHubProducerClient(service.connectionString, service.path); + const clientConnectionContext = client["_client"]["_context"]; + + // Send an event to open the connection. + await client.sendBatch([{ body: "test" }]); + const originalConnectionId = clientConnectionContext.connectionId; + + // We need to dig deep into the internals to get the awaitable sender so that . + const awaitableSender = client["_producersMap"].get("")!["_eventHubSender"]!["_sender"]!; + + // Change the timeout on the awaitableSender so it forces an OperationTimeoutError + awaitableSender.sendTimeoutInSeconds = 0; + // Ensure that the connection will disconnect, and another sendBatch occurs while a sendBatch is in-flight. + setTimeout(() => { + // Trigger a disconnect on the underlying connection while the `sendBatch` is in flight. + clientConnectionContext.connection["_connection"].idle(); + // Triggering another sendBatch immediately after an idle + // used to cause the rhea connection remote state to be cleared. + // This caused the in-flight sendBatch to throw an uncaught error + // if it timed out. + client.sendBatch([{ body: "test3" }]); + }, 0); + + await client.sendBatch([{ body: "test2" }]); + const newConnectionId = clientConnectionContext.connectionId; + + should.not.equal(originalConnectionId, newConnectionId); + + await client.close(); + }); + }); +}); diff --git a/sdk/eventhub/event-hubs/test/receiver.spec.ts b/sdk/eventhub/event-hubs/test/receiver.spec.ts index 465d3b142cf4..4166eb5b6d37 100644 --- a/sdk/eventhub/event-hubs/test/receiver.spec.ts +++ b/sdk/eventhub/event-hubs/test/receiver.spec.ts @@ -453,30 +453,6 @@ describe("EventHub Receiver", function(): void { }); describe("in batch mode", function(): void { - describe("disconnected", function() { - it("should receive after a disconnect", async () => { - const client = new EventHubClient(service.connectionString, service.path); - const receiver = client.createConsumer(EventHubClient.defaultConsumerGroupName, "0", { - sequenceNumber: 0 - }); - const clientConnectionContext = receiver["_context"]; - - await receiver.receiveBatch(1, 1); - const originalConnectionId = clientConnectionContext.connectionId; - - // Trigger a disconnect on the underlying connection. - clientConnectionContext.connection["_connection"].idle(); - - await receiver.receiveBatch(1, 1); - const newConnectionId = clientConnectionContext.connectionId; - - should.not.equal(originalConnectionId, newConnectionId); - - await receiver.close(); - await client.close(); - }); - }); - it("should receive messages correctly", async function(): Promise { const partitionId = partitionIds[0]; receiver = client.createConsumer( diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 555f3070cff4..768f8fff906a 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -1351,58 +1351,6 @@ describe("EventHub Sender", function(): void { }); }); - describe("disconnected", function(): void { - it("should send after a disconnect", async () => { - const client = new EventHubProducerClient(service.connectionString, service.path); - const clientConnectionContext = client["_client"]["_context"]; - - await client.sendBatch([{ body: "test" }]); - const originalConnectionId = clientConnectionContext.connectionId; - - // Trigger a disconnect on the underlying connection. - clientConnectionContext.connection["_connection"].idle(); - - await client.sendBatch([{ body: "test2" }]); - const newConnectionId = clientConnectionContext.connectionId; - - should.not.equal(originalConnectionId, newConnectionId); - - await client.close(); - }); - - it("should not throw an uncaught exception", async () => { - const client = new EventHubProducerClient(service.connectionString, service.path); - const clientConnectionContext = client["_client"]["_context"]; - - // Send an event to open the connection. - await client.sendBatch([{ body: "test" }]); - const originalConnectionId = clientConnectionContext.connectionId; - - // We need to dig deep into the internals to get the awaitable sender so that . - const awaitableSender = client["_producersMap"].get("")!["_eventHubSender"]!["_sender"]!; - - // Change the timeout on the awaitableSender so it forces an OperationTimeoutError - awaitableSender.sendTimeoutInSeconds = 0; - // Ensure that the connection will disconnect, and another sendBatch occurs while a sendBatch is in-flight. - setTimeout(() => { - // Trigger a disconnect on the underlying connection while the `sendBatch` is in flight. - clientConnectionContext.connection["_connection"].idle(); - // Triggering another sendBatch immediately after an idle - // used to cause the rhea connection remote state to be cleared. - // This caused the in-flight sendBatch to throw an uncaught error - // if it timed out. - client.sendBatch([{ body: "test3" }]); - }, 0); - - await client.sendBatch([{ body: "test2" }]); - const newConnectionId = clientConnectionContext.connectionId; - - should.not.equal(originalConnectionId, newConnectionId); - - await client.close(); - }); - }); - async function sendBatch( bodies: any[], partitionId: string, From 37358cece37400064fdb8130176339bf9d3ea5a3 Mon Sep 17 00:00:00 2001 From: chradek Date: Thu, 14 May 2020 12:44:03 -0700 Subject: [PATCH 04/12] some cleanup --- sdk/eventhub/event-hubs/src/connectionContext.ts | 16 ++++++++-------- .../event-hubs/src/util/connectionUtils.ts | 12 ------------ 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index 84afb1e9ea5c..8caa2f347085 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -130,11 +130,11 @@ export namespace ConnectionContext { return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen()); }; connectionContext.isDisconnecting = false; - let waitForResolve: () => void; - let waitForPromise: Promise | undefined; + let waitForDisconnectResolve: () => void; + let waitForDisconnectPromise: Promise | undefined; connectionContext.waitForConnectionReset = function() { - if (this.isDisconnecting && waitForPromise) { - return waitForPromise; + if (this.isDisconnecting && waitForDisconnectPromise) { + return waitForDisconnectPromise; } return Promise.resolve(); }; @@ -155,8 +155,8 @@ export namespace ConnectionContext { return; } connectionContext.isDisconnecting = true; - waitForPromise = new Promise((resolve) => { - waitForResolve = resolve; + waitForDisconnectPromise = new Promise((resolve) => { + waitForDisconnectResolve = resolve; }); logger.verbose( @@ -241,8 +241,8 @@ export namespace ConnectionContext { } await refreshConnection(connectionContext); - waitForResolve(); - waitForPromise = undefined; + waitForDisconnectResolve(); + waitForDisconnectPromise = undefined; connectionContext.isDisconnecting = false; }; diff --git a/sdk/eventhub/event-hubs/src/util/connectionUtils.ts b/sdk/eventhub/event-hubs/src/util/connectionUtils.ts index 066ff8cfa389..4304e59c2083 100644 --- a/sdk/eventhub/event-hubs/src/util/connectionUtils.ts +++ b/sdk/eventhub/event-hubs/src/util/connectionUtils.ts @@ -5,18 +5,6 @@ import { ConnectionEvents } from "rhea-promise"; import { ConnectionContext } from "../connectionContext"; import { logger } from "../log"; -/** - * Checks if the connection on a ConnectionContext is closing. - * @internal - * @ignore - * @param context - */ -export function isConnectionClosing(context: ConnectionContext): boolean { - // When the connection is not open, but the remote end is open, - // then the rhea connection is in the process of terminating. - return Boolean(!context.connection.isOpen() && context.connection.isRemoteOpen()); -} - /** * Resolves once the context's connection emits a 'disconnected' event. * @internal From 8369442262e70783cdfdca2e0f0e50b47c3c8807 Mon Sep 17 00:00:00 2001 From: chradek Date: Mon, 1 Jun 2020 14:58:12 -0700 Subject: [PATCH 05/12] clean up connectionContext methods and other feedback updates --- .../event-hubs/src/connectionContext.ts | 131 ++++++++++++------ .../event-hubs/src/eventHubReceiver.ts | 35 +++-- sdk/eventhub/event-hubs/src/eventHubSender.ts | 39 +++--- .../event-hubs/src/managementClient.ts | 13 +- .../event-hubs/src/util/connectionUtils.ts | 23 --- .../event-hubs/test/node/disconnects.spec.ts | 10 +- 6 files changed, 131 insertions(+), 120 deletions(-) delete mode 100644 sdk/eventhub/event-hubs/src/util/connectionUtils.ts diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index 8caa2f347085..910d89f6e11b 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -1,8 +1,9 @@ -/* eslint-disable @azure/azure-sdk/ts-no-namespaces */ -/* eslint-disable no-inner-declarations */ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +/* eslint-disable @azure/azure-sdk/ts-no-namespaces */ +/* eslint-disable no-inner-declarations */ + import { logger } from "./log"; import { getRuntimeInfo } from "./util/runtimeInfo"; import { packageJsonInfo } from "./util/constants"; @@ -51,9 +52,22 @@ export interface ConnectionContext extends ConnectionContextBase { */ managementSession?: ManagementClient; + /** + * Indicates whether the connection is in the process of closing. + */ isConnectionClosing(): boolean; - - isDisconnecting: boolean; + /** + * Resolves once the connectionContext is ready to be opened. + */ + readyToOpen(): Promise; + /** + * Resolves once the context's connection emits a `disconnected` event. + */ + waitForConnectionDisconnected(): Promise; + /** + * Resolves once the connection has finished being reset. + * Connections are reset as part of reacting to a `disconnected` event. + */ waitForConnectionReset(): Promise; } @@ -66,6 +80,25 @@ export interface ConnectionContextOptions extends EventHubClientOptions { managementSessionAudience?: string; } +/** + * Helper type to get the names of all the functions on an object. + */ +type FunctionPropertyNames = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T]; +/** + * Helper type to get the types of all the functions on an object. + */ +type FunctionProperties = Pick>; +/** + * Helper type to get the types of all the functions on ConnectionContext. + * Note that this excludes the functions that ConnectionContext inherits. + * Each function also has its `this` type set as `ConnectionContext`. + */ +type ConnectionContextMethods = Omit< + FunctionProperties, + FunctionPropertyNames +> & + ThisType; + /** * @internal * @ignore @@ -124,20 +157,48 @@ export namespace ConnectionContext { audience: options.managementSessionAudience }; connectionContext.managementSession = new ManagementClient(connectionContext, mOptions); - connectionContext.isConnectionClosing = function() { - // When the connection is not open, but the remote end is open, - // then the rhea connection is in the process of terminating. - return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen()); - }; - connectionContext.isDisconnecting = false; + + let isDisconnecting = false; let waitForDisconnectResolve: () => void; let waitForDisconnectPromise: Promise | undefined; - connectionContext.waitForConnectionReset = function() { - if (this.isDisconnecting && waitForDisconnectPromise) { - return waitForDisconnectPromise; + + Object.assign(connectionContext, { + isConnectionClosing() { + // When the connection is not open, but the remote end is open, + // then the rhea connection is in the process of terminating. + return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen()); + }, + async readyToOpen() { + // Check that the connection isn't in the process of closing. + // This can happen when the idle timeout has been reached but + // the underlying socket is waiting to be destroyed. + if (this.isConnectionClosing()) { + // Wait for the disconnected event that indicates the underlying socket has closed. + await this.waitForConnectionDisconnected(); + } + // Check if the connection is currently in the process of disconnecting. + if (isDisconnecting) { + // Wait for the connection to be reset. + await this.waitForConnectionReset(); + } + }, + waitForConnectionDisconnected() { + return new Promise((resolve) => { + logger.verbose( + `[${this.connectionId}] Attempting to reinitialize connection` + + ` but the connection is in the process of closing.` + + ` Waiting for the disconnect event before continuing.` + ); + this.connection.once(ConnectionEvents.disconnected, resolve); + }); + }, + waitForConnectionReset() { + if (waitForDisconnectPromise) { + return waitForDisconnectPromise; + } + return Promise.resolve(); } - return Promise.resolve(); - }; + }); // Define listeners to be added to the connection object for // "connection_open" and "connection_error" events. @@ -151,10 +212,10 @@ export namespace ConnectionContext { }; const disconnected: OnAmqpEvent = async (context: EventContext) => { - if (connectionContext.isDisconnecting) { + if (isDisconnecting) { return; } - connectionContext.isDisconnecting = true; + isDisconnecting = true; waitForDisconnectPromise = new Promise((resolve) => { waitForDisconnectResolve = resolve; }); @@ -198,44 +259,26 @@ export namespace ConnectionContext { connectionContext.connection.removeAllSessions(); // Close the cbs session to ensure all the event handlers are released. - await connectionContext.cbsSession.close().catch((err) => { - logger.verbose( - "[%s] Error when closing cbsSession after disconnected event: %O", - connectionContext.connection.id, - err - ); + await connectionContext.cbsSession.close().catch(() => { + /* error already logged, swallow it here */ }); // Close the management session to ensure all the event handlers are released. - await connectionContext.managementSession!.close().catch((err) => { - logger.verbose( - "[%s] Error when closing managementSession after disconnected event: %O", - connectionContext.connection.id, - err - ); + await connectionContext.managementSession!.close().catch(() => { + /* error already logged, swallow it here */ }); // Close all senders and receivers to ensure clean up of timers & other resources. if (state.numSenders || state.numReceivers) { for (const senderName of Object.keys(connectionContext.senders)) { const sender = connectionContext.senders[senderName]; - await sender.close().catch((err) => { - logger.verbose( - "[%s] Error when closing sender [%s] after disconnected event: %O", - connectionContext.connection.id, - senderName, - err - ); + await sender.close().catch(() => { + /* error already logged, swallow it here */ }); } for (const receiverName of Object.keys(connectionContext.receivers)) { const receiver = connectionContext.receivers[receiverName]; - await receiver.close().catch((err) => { - logger.verbose( - "[%s] Error when closing sender [%s] after disconnected event: %O", - connectionContext.connection.id, - receiverName, - err - ); + await receiver.close().catch(() => { + /* error already logged, swallow it here */ }); } } @@ -243,7 +286,7 @@ export namespace ConnectionContext { await refreshConnection(connectionContext); waitForDisconnectResolve(); waitForDisconnectPromise = undefined; - connectionContext.isDisconnecting = false; + isDisconnecting = false; }; const protocolError: OnAmqpEvent = async (context: EventContext) => { diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index 94fbf62ac1ad..c665218afd65 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -17,7 +17,6 @@ import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; import { EventPosition, getEventPositionFilter } from "./eventPosition"; import { AbortSignalLike, AbortError } from "@azure/abort-controller"; -import { waitForConnectionDisconnected } from "./util/connectionUtils"; /** * @ignore @@ -359,15 +358,22 @@ export class EventHubReceiver extends LinkEntity { * @returns */ async close(): Promise { - this.clearHandlers(); + try { + this.clearHandlers(); - if (!this._receiver) { - return; - } + if (!this._receiver) { + return; + } - const receiverLink = this._receiver; - this._deleteFromCache(); - await this._closeLink(receiverLink); + const receiverLink = this._receiver; + this._deleteFromCache(); + await this._closeLink(receiverLink); + } catch (err) { + const msg = `[${this._context.connectionId}] An error occurred while closing receiver ${this.name}: ${err}`; + logger.warning(msg); + logErrorStackTrace(err); + throw err; + } } /** @@ -513,17 +519,8 @@ export class EventHubReceiver extends LinkEntity { if (!this.isOpen() && !this.isConnecting) { this.isConnecting = true; - // Check that the connection isn't in the process of closing. - // This can happen when the idle timeout has been reached but - // the underlying socket is waiting to be destroyed. - if (this._context.isConnectionClosing()) { - // Wait for the disconnected event that indicates the underlying socket has closed. - await waitForConnectionDisconnected(this._context); - } - if (this._context.isDisconnecting) { - await this._context.waitForConnectionReset(); - } - + // Wait for the connectionContext to be ready for opening. + await this._context.readyToOpen(); await this._negotiateClaim(); const receiverOptions: CreateReceiverOptions = { diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 30d587bf1df1..bcd2de5bc5d8 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -31,7 +31,6 @@ import { SendOptions } from "./models/public"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; import { AbortSignalLike, AbortError } from "@azure/abort-controller"; import { EventDataBatch, isEventDataBatch } from "./eventDataBatch"; -import { waitForConnectionDisconnected } from "./util/connectionUtils"; /** * Describes the EventHubSender that will send event data to EventHub. @@ -172,15 +171,22 @@ export class EventHubSender extends LinkEntity { * @returns Promise */ async close(): Promise { - if (this._sender) { - logger.info( - "[%s] Closing the Sender for the entity '%s'.", - this._context.connectionId, - this._context.config.entityPath - ); - const senderLink = this._sender; - this._deleteFromCache(); - await this._closeLink(senderLink); + try { + if (this._sender) { + logger.info( + "[%s] Closing the Sender for the entity '%s'.", + this._context.connectionId, + this._context.config.entityPath + ); + const senderLink = this._sender; + this._deleteFromCache(); + await this._closeLink(senderLink); + } + } catch (err) { + const msg = `[${this._context.connectionId}] An error occurred while closing sender ${this.name}: ${err}`; + logger.warning(msg); + logErrorStackTrace(err); + throw err; } } @@ -563,17 +569,8 @@ export class EventHubSender extends LinkEntity { if (!this.isOpen() && !this.isConnecting) { this.isConnecting = true; - // Check that the connection isn't in the process of closing. - // This can happen when the idle timeout has been reached but - // the underlying socket is waiting to be destroyed. - if (this._context.isConnectionClosing()) { - // Wait for the disconnected event that indicates the underlying socket has closed. - await waitForConnectionDisconnected(this._context); - } - if (this._context.isDisconnecting) { - await this._context.waitForConnectionReset(); - } - + // Wait for the connectionContext to be ready for opening. + await this._context.readyToOpen(); await this._negotiateClaim(); logger.verbose( diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index 7381ab755a55..ee0efef9f827 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -28,7 +28,6 @@ import { LinkEntity } from "./linkEntity"; import { logger, logErrorStackTrace } from "./log"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; import { AbortSignalLike, AbortError } from "@azure/abort-controller"; -import { waitForConnectionDisconnected } from "./util/connectionUtils"; /** * Describes the runtime information of an Event Hub. */ @@ -276,16 +275,8 @@ export class ManagementClient extends LinkEntity { private async _init(): Promise { try { if (!this._isMgmtRequestResponseLinkOpen()) { - // Check that the connection isn't in the process of closing. - // This can happen when the idle timeout has been reached but - // the underlying socket is waiting to be destroyed. - if (this._context.isConnectionClosing()) { - // Wait for the disconnected event that indicates the underlying socket has closed. - await waitForConnectionDisconnected(this._context); - } - if (this._context.isDisconnecting) { - await this._context.waitForConnectionReset(); - } + // Wait for the connectionContext to be ready for opening. + await this._context.readyToOpen(); await this._negotiateClaim(); const rxopt: ReceiverOptions = { source: { address: this.address }, diff --git a/sdk/eventhub/event-hubs/src/util/connectionUtils.ts b/sdk/eventhub/event-hubs/src/util/connectionUtils.ts deleted file mode 100644 index 4304e59c2083..000000000000 --- a/sdk/eventhub/event-hubs/src/util/connectionUtils.ts +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -import { ConnectionEvents } from "rhea-promise"; -import { ConnectionContext } from "../connectionContext"; -import { logger } from "../log"; - -/** - * Resolves once the context's connection emits a 'disconnected' event. - * @internal - * @ignore - * @param context - */ -export async function waitForConnectionDisconnected(context: ConnectionContext): Promise { - return new Promise((resolve) => { - logger.verbose( - `[${context.connectionId}] Attempting to reinitialize connection` + - ` but the connection is in the process of closing.` + - ` Waiting for the disconnect event before continuing.` - ); - context.connection.once(ConnectionEvents.disconnected, resolve); - }); -} diff --git a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts index 4dbf5c654e74..b4c885abca13 100644 --- a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts +++ b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts @@ -44,10 +44,11 @@ describe("disconnected", function() { // Trigger a disconnect on the underlying connection. clientConnectionContext.connection["_connection"].idle(); - await client.getPartitionIds({}); + const partitionIds = await client.getPartitionIds({}); const newConnectionId = clientConnectionContext.connectionId; should.not.equal(originalConnectionId, newConnectionId); + partitionIds.length.should.greaterThan(0, "Invalid number of partition ids returned."); }); }); @@ -104,6 +105,7 @@ describe("disconnected", function() { // We need to dig deep into the internals to get the awaitable sender so that . const awaitableSender = client["_producersMap"].get("")!["_eventHubSender"]!["_sender"]!; + let thirdSend: Promise; // Change the timeout on the awaitableSender so it forces an OperationTimeoutError awaitableSender.sendTimeoutInSeconds = 0; // Ensure that the connection will disconnect, and another sendBatch occurs while a sendBatch is in-flight. @@ -114,7 +116,7 @@ describe("disconnected", function() { // used to cause the rhea connection remote state to be cleared. // This caused the in-flight sendBatch to throw an uncaught error // if it timed out. - client.sendBatch([{ body: "test3" }]); + thirdSend = client.sendBatch([{ body: "test3" }]); }, 0); await client.sendBatch([{ body: "test2" }]); @@ -122,6 +124,10 @@ describe("disconnected", function() { should.not.equal(originalConnectionId, newConnectionId); + // ensure the sendBatch from the setTimeout succeeded. + // Wait for the connectionContext to be ready for opening. + await thirdSend!; + await client.close(); }); }); From a9baa6e5877ef58d6682d1f4f4918862e1f9792c Mon Sep 17 00:00:00 2001 From: chradek Date: Thu, 4 Jun 2020 06:46:59 -0700 Subject: [PATCH 06/12] address feedback --- sdk/eventhub/event-hubs/CHANGELOG.md | 2 + sdk/eventhub/event-hubs/package.json | 2 +- .../event-hubs/src/connectionContext.ts | 45 +++++++++++++------ .../event-hubs/src/eventHubReceiver.ts | 4 +- sdk/eventhub/event-hubs/src/eventHubSender.ts | 4 +- .../event-hubs/src/managementClient.ts | 8 ++-- 6 files changed, 43 insertions(+), 22 deletions(-) diff --git a/sdk/eventhub/event-hubs/CHANGELOG.md b/sdk/eventhub/event-hubs/CHANGELOG.md index 3116a47366fe..8adec125f1da 100644 --- a/sdk/eventhub/event-hubs/CHANGELOG.md +++ b/sdk/eventhub/event-hubs/CHANGELOG.md @@ -4,6 +4,8 @@ - Fixes issue [#8584](https://github.com/Azure/azure-sdk-for-js/issues/8584) where a `TypeError` was sometimes thrown as an uncaught exception. + ([PR 8884](https://github.com/Azure/azure-sdk-for-js/pull/8884)) +- Fixes reconnection issues by creating a new connection object rather than re-using the existing one. ([PR 8884](https://github.com/Azure/azure-sdk-for-js/pull/8884)) ### Tracing updates: diff --git a/sdk/eventhub/event-hubs/package.json b/sdk/eventhub/event-hubs/package.json index 6c485ce56bdd..231001d7b38e 100644 --- a/sdk/eventhub/event-hubs/package.json +++ b/sdk/eventhub/event-hubs/package.json @@ -81,7 +81,7 @@ }, "dependencies": { "@azure/abort-controller": "^1.0.0", - "@azure/core-amqp": "^1.1.1", + "@azure/core-amqp": "^1.1.3", "@azure/core-asynciterator-polyfill": "^1.0.0", "@azure/core-tracing": "1.0.0-preview.8", "@azure/logger": "^1.0.0", diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index 910d89f6e11b..774f98773ff8 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -51,19 +51,35 @@ export interface ConnectionContext extends ConnectionContextBase { * the underlying amqp connection for the EventHub Client. */ managementSession?: ManagementClient; + /** + * Resolves once the connectionContext is ready to open a link. + * A link can be opened when: + * - The connection is already opened on both sides. + * - The connection has been closed or disconnected. + * A link is not ready to be opened if the connection + * is in the process of closing or disconnecting. + */ + readyToOpenLink(): Promise; +} +/** + * Describes the members on the ConnectionContext that are only + * used by it internally. + * @ignore + * @internal + */ +export interface ConnectionContextInternalMembers extends ConnectionContext { /** * Indicates whether the connection is in the process of closing. + * When this returns `true`, a `disconnected` event will be received + * after the connection is closed. + * */ isConnectionClosing(): boolean; - /** - * Resolves once the connectionContext is ready to be opened. - */ - readyToOpen(): Promise; /** * Resolves once the context's connection emits a `disconnected` event. */ - waitForConnectionDisconnected(): Promise; + waitForDisconnectedEvent(): Promise; /** * Resolves once the connection has finished being reset. * Connections are reset as part of reacting to a `disconnected` event. @@ -89,15 +105,16 @@ type FunctionPropertyNames = { [K in keyof T]: T[K] extends Function ? K : ne */ type FunctionProperties = Pick>; /** - * Helper type to get the types of all the functions on ConnectionContext. + * Helper type to get the types of all the functions on ConnectionContext + * and the internal methods from ConnectionContextInternalMembers. * Note that this excludes the functions that ConnectionContext inherits. * Each function also has its `this` type set as `ConnectionContext`. */ type ConnectionContextMethods = Omit< - FunctionProperties, + FunctionProperties, FunctionPropertyNames > & - ThisType; + ThisType; /** * @internal @@ -168,13 +185,13 @@ export namespace ConnectionContext { // then the rhea connection is in the process of terminating. return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen()); }, - async readyToOpen() { + async readyToOpenLink() { // Check that the connection isn't in the process of closing. // This can happen when the idle timeout has been reached but // the underlying socket is waiting to be destroyed. if (this.isConnectionClosing()) { // Wait for the disconnected event that indicates the underlying socket has closed. - await this.waitForConnectionDisconnected(); + await this.waitForDisconnectedEvent(); } // Check if the connection is currently in the process of disconnecting. if (isDisconnecting) { @@ -182,7 +199,7 @@ export namespace ConnectionContext { await this.waitForConnectionReset(); } }, - waitForConnectionDisconnected() { + waitForDisconnectedEvent() { return new Promise((resolve) => { logger.verbose( `[${this.connectionId}] Attempting to reinitialize connection` + @@ -211,7 +228,7 @@ export namespace ConnectionContext { ); }; - const disconnected: OnAmqpEvent = async (context: EventContext) => { + const onDisconnected: OnAmqpEvent = async (context: EventContext) => { if (isDisconnecting) { return; } @@ -336,7 +353,7 @@ export namespace ConnectionContext { function addConnectionListeners(connection: Connection) { // Add listeners on the connection object. connection.on(ConnectionEvents.connectionOpen, onConnectionOpen); - connection.on(ConnectionEvents.disconnected, disconnected); + connection.on(ConnectionEvents.disconnected, onDisconnected); connection.on(ConnectionEvents.protocolError, protocolError); connection.on(ConnectionEvents.error, error); } @@ -347,7 +364,7 @@ export namespace ConnectionContext { ConnectionEvents.connectionOpen, onConnectionOpen ); - connectionContext.connection.removeListener(ConnectionEvents.disconnected, disconnected); + connectionContext.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); connectionContext.connection.removeListener(ConnectionEvents.protocolError, protocolError); connectionContext.connection.removeListener(ConnectionEvents.error, error); // Close the connection diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index c665218afd65..e97501866165 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -519,8 +519,8 @@ export class EventHubReceiver extends LinkEntity { if (!this.isOpen() && !this.isConnecting) { this.isConnecting = true; - // Wait for the connectionContext to be ready for opening. - await this._context.readyToOpen(); + // Wait for the connectionContext to be ready to open the link. + await this._context.readyToOpenLink(); await this._negotiateClaim(); const receiverOptions: CreateReceiverOptions = { diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index bcd2de5bc5d8..3db40e1b3610 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -569,8 +569,8 @@ export class EventHubSender extends LinkEntity { if (!this.isOpen() && !this.isConnecting) { this.isConnecting = true; - // Wait for the connectionContext to be ready for opening. - await this._context.readyToOpen(); + // Wait for the connectionContext to be ready to open the link. + await this._context.readyToOpenLink(); await this._negotiateClaim(); logger.verbose( diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index ee0efef9f827..c7b608e78d23 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -275,8 +275,8 @@ export class ManagementClient extends LinkEntity { private async _init(): Promise { try { if (!this._isMgmtRequestResponseLinkOpen()) { - // Wait for the connectionContext to be ready for opening. - await this._context.readyToOpen(); + // Wait for the connectionContext to be ready to open the link. + await this._context.readyToOpenLink(); await this._negotiateClaim(); const rxopt: ReceiverOptions = { source: { address: this.address }, @@ -293,7 +293,9 @@ export class ManagementClient extends LinkEntity { ); } }; - const sropt: SenderOptions = { target: { address: this.address } }; + const sropt: SenderOptions = { + target: { address: this.address } + }; logger.verbose( "[%s] Creating sender/receiver links on a session for $management endpoint with " + "srOpts: %o, receiverOpts: %O.", From 06b8e4e69d3e17a05bfb1eef91e93c1e43244daf Mon Sep 17 00:00:00 2001 From: chradek Date: Thu, 4 Jun 2020 06:47:36 -0700 Subject: [PATCH 07/12] update service-bus core-amqp dep --- sdk/servicebus/service-bus/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/package.json b/sdk/servicebus/service-bus/package.json index 0f3fbfda19f2..68c50ac6a56d 100644 --- a/sdk/servicebus/service-bus/package.json +++ b/sdk/servicebus/service-bus/package.json @@ -86,7 +86,7 @@ }, "dependencies": { "@azure/abort-controller": "^1.0.0", - "@azure/core-amqp": "^1.1.1", + "@azure/core-amqp": "^1.1.3", "@azure/core-asynciterator-polyfill": "^1.0.0", "@azure/core-http": "^1.1.1", "@azure/core-tracing": "1.0.0-preview.8", From c98f6cff4bf26606e9c36ee9028a992d8b1b8014 Mon Sep 17 00:00:00 2001 From: chradek Date: Thu, 4 Jun 2020 07:48:10 -0700 Subject: [PATCH 08/12] fix test after merge --- sdk/eventhub/event-hubs/test/node/disconnects.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts index b4c885abca13..5e6edb94f0b6 100644 --- a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts +++ b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts @@ -7,7 +7,7 @@ import chaiAsPromised from "chai-as-promised"; chai.use(chaiAsPromised); import { EnvVarKeys, getEnvVars } from "../utils/testUtils"; import { EventHubClient } from "../../src/impl/eventHubClient"; -import { EventHubProducerClient } from "../../src/index"; +import { EventHubConsumerClient, EventHubProducerClient } from "../../src/index"; const env = getEnvVars(); describe("disconnected", function() { @@ -55,7 +55,7 @@ describe("disconnected", function() { describe("Receiver", function() { it("should receive after a disconnect", async () => { client = new EventHubClient(service.connectionString, service.path); - const receiver = client.createConsumer(EventHubClient.defaultConsumerGroupName, "0", { + const receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, "0", { sequenceNumber: 0 }); const clientConnectionContext = receiver["_context"]; From 00943d7482bc18d86e7337e51bdf06817f66b55a Mon Sep 17 00:00:00 2001 From: chradek Date: Thu, 4 Jun 2020 07:48:23 -0700 Subject: [PATCH 09/12] fix flaky receiver test --- sdk/eventhub/event-hubs/test/receiver.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/event-hubs/test/receiver.spec.ts b/sdk/eventhub/event-hubs/test/receiver.spec.ts index 150d684cac25..a3b8fb589999 100644 --- a/sdk/eventhub/event-hubs/test/receiver.spec.ts +++ b/sdk/eventhub/event-hubs/test/receiver.spec.ts @@ -261,13 +261,13 @@ describe("EventHub Receiver", function(): void { describe("in streaming mode", function(): void { it("should receive messages correctly", async function(): Promise { const partitionId = partitionIds[0]; - const time = Date.now(); + const pInfo = await client.getPartitionProperties(partitionId); // send a message that can be received await producerClient.sendBatch([{ body: "receive behaves correctly" }], { partitionId }); receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - enqueuedOn: time + sequenceNumber: pInfo.lastEnqueuedSequenceNumber }); const received: ReceivedEventData[] = await new Promise((resolve, reject) => { From 883325d8ec470358128bf828bd1ea09ceb6fb3cc Mon Sep 17 00:00:00 2001 From: chradek <51000525+chradek@users.noreply.github.com> Date: Thu, 4 Jun 2020 10:50:59 -0700 Subject: [PATCH 10/12] Update sdk/eventhub/event-hubs/src/connectionContext.ts Co-authored-by: Ramya Rao --- sdk/eventhub/event-hubs/src/connectionContext.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index 0439ed9e2c9a..e762e44e4875 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -52,11 +52,12 @@ export interface ConnectionContext extends ConnectionContextBase { */ managementSession?: ManagementClient; /** - * Resolves once the connectionContext is ready to open a link. - * A link can be opened when: - * - The connection is already opened on both sides. - * - The connection has been closed or disconnected. - * A link is not ready to be opened if the connection + * Function returning a promise that resolves once the connectionContext is ready to open an AMQP link. + * ConnectionContext will be ready to open an AMQP link when: + * - The AMQP connection is already open on both sides. + * - The AMQP connection has been closed or disconnected. In this case, a new AMQP connection is expected + * to be created first. + * An AMQP link cannot be opened if the AMQP connection * is in the process of closing or disconnecting. */ readyToOpenLink(): Promise; From eb0e0f29ac1644e40e4518b7a10a28c57fe760c2 Mon Sep 17 00:00:00 2001 From: chradek Date: Thu, 4 Jun 2020 12:50:18 -0700 Subject: [PATCH 11/12] remove unneeded variable --- sdk/eventhub/event-hubs/src/connectionContext.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index e762e44e4875..6462361d8b8c 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -176,7 +176,6 @@ export namespace ConnectionContext { }; connectionContext.managementSession = new ManagementClient(connectionContext, mOptions); - let isDisconnecting = false; let waitForDisconnectResolve: () => void; let waitForDisconnectPromise: Promise | undefined; @@ -195,7 +194,7 @@ export namespace ConnectionContext { await this.waitForDisconnectedEvent(); } // Check if the connection is currently in the process of disconnecting. - if (isDisconnecting) { + if (waitForDisconnectPromise) { // Wait for the connection to be reset. await this.waitForConnectionReset(); } @@ -230,10 +229,9 @@ export namespace ConnectionContext { }; const onDisconnected: OnAmqpEvent = async (context: EventContext) => { - if (isDisconnecting) { + if (waitForDisconnectPromise) { return; } - isDisconnecting = true; waitForDisconnectPromise = new Promise((resolve) => { waitForDisconnectResolve = resolve; }); @@ -304,7 +302,6 @@ export namespace ConnectionContext { await refreshConnection(connectionContext); waitForDisconnectResolve(); waitForDisconnectPromise = undefined; - isDisconnecting = false; }; const protocolError: OnAmqpEvent = async (context: EventContext) => { From f4629564c06d7a1f98f27352aea1026903d4242c Mon Sep 17 00:00:00 2001 From: chradek <51000525+chradek@users.noreply.github.com> Date: Thu, 4 Jun 2020 13:56:06 -0700 Subject: [PATCH 12/12] Update sdk/eventhub/event-hubs/CHANGELOG.md Co-authored-by: Ramya Rao --- sdk/eventhub/event-hubs/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/CHANGELOG.md b/sdk/eventhub/event-hubs/CHANGELOG.md index 8adec125f1da..a7892ce6d77e 100644 --- a/sdk/eventhub/event-hubs/CHANGELOG.md +++ b/sdk/eventhub/event-hubs/CHANGELOG.md @@ -3,7 +3,8 @@ ## 5.2.1 (Unreleased) - Fixes issue [#8584](https://github.com/Azure/azure-sdk-for-js/issues/8584) - where a `TypeError` was sometimes thrown as an uncaught exception. + where attempting to create AMQP links when the AMQP connection was in the + process of closing resulted in a `TypeError` in an uncaught exception. ([PR 8884](https://github.com/Azure/azure-sdk-for-js/pull/8884)) - Fixes reconnection issues by creating a new connection object rather than re-using the existing one. ([PR 8884](https://github.com/Azure/azure-sdk-for-js/pull/8884))