Skip to content

Commit

Permalink
[event-hubs] fix uncaught TypeError (#8884)
Browse files Browse the repository at this point in the history
* [event-hubs] fix uncaught TypeError

* [event-hubs] create new rhea connection on disconnected

* make disconnect tests node only

* some cleanup

* clean up connectionContext methods and other feedback updates

* address feedback

* update service-bus core-amqp dep

* fix test after merge

* fix flaky receiver test

* Update sdk/eventhub/event-hubs/src/connectionContext.ts

Co-authored-by: Ramya Rao <[email protected]>

* remove unneeded variable

* Update sdk/eventhub/event-hubs/CHANGELOG.md

Co-authored-by: Ramya Rao <[email protected]>

Co-authored-by: Ramya Rao <[email protected]>
  • Loading branch information
chradek and ramya-rao-a authored Jun 4, 2020
1 parent c9dd990 commit b33d7df
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 51 deletions.
6 changes: 6 additions & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## 5.2.1 (Unreleased)

- Fixes issue [#8584](https://github.com/Azure/azure-sdk-for-js/issues/8584)
where attempting to create AMQP links when the AMQP connection was in the
process of closing resulted in a `TypeError` in an uncaught exception.
([PR 8884](https://github.com/Azure/azure-sdk-for-js/pull/8884))
- Fixes reconnection issues by creating a new connection object rather than re-using the existing one. ([PR 8884](https://github.com/Azure/azure-sdk-for-js/pull/8884))

### Tracing updates:

Tracing functionality is still in preview status and the APIs may have breaking
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
},
"dependencies": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-amqp": "^1.1.1",
"@azure/core-amqp": "^1.1.3",
"@azure/core-asynciterator-polyfill": "^1.0.0",
"@azure/core-tracing": "1.0.0-preview.8",
"@azure/logger": "^1.0.0",
Expand Down
194 changes: 165 additions & 29 deletions sdk/eventhub/event-hubs/src/connectionContext.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

/* eslint-disable @azure/azure-sdk/ts-no-namespaces */
/* eslint-disable no-inner-declarations */

import { logger } from "./log";
import { getRuntimeInfo } from "./util/runtimeInfo";
import { packageJsonInfo } from "./util/constants";
Expand All @@ -16,7 +19,7 @@ import {
} from "@azure/core-amqp";
import { ManagementClient, ManagementClientOptions } from "./managementClient";
import { EventHubClientOptions } from "./models/public";
import { ConnectionEvents, Dictionary, EventContext, OnAmqpEvent } from "rhea-promise";
import { Connection, ConnectionEvents, Dictionary, EventContext, OnAmqpEvent } from "rhea-promise";

/**
* @internal
Expand Down Expand Up @@ -48,6 +51,41 @@ export interface ConnectionContext extends ConnectionContextBase {
* the underlying amqp connection for the EventHub Client.
*/
managementSession?: ManagementClient;
/**
* Function returning a promise that resolves once the connectionContext is ready to open an AMQP link.
* ConnectionContext will be ready to open an AMQP link when:
* - The AMQP connection is already open on both sides.
* - The AMQP connection has been closed or disconnected. In this case, a new AMQP connection is expected
* to be created first.
* An AMQP link cannot be opened if the AMQP connection
* is in the process of closing or disconnecting.
*/
readyToOpenLink(): Promise<void>;
}

/**
* Describes the members on the ConnectionContext that are only
* used by it internally.
* @ignore
* @internal
*/
export interface ConnectionContextInternalMembers extends ConnectionContext {
/**
* Indicates whether the connection is in the process of closing.
* When this returns `true`, a `disconnected` event will be received
* after the connection is closed.
*
*/
isConnectionClosing(): boolean;
/**
* Resolves once the context's connection emits a `disconnected` event.
*/
waitForDisconnectedEvent(): Promise<void>;
/**
* Resolves once the connection has finished being reset.
* Connections are reset as part of reacting to a `disconnected` event.
*/
waitForConnectionReset(): Promise<void>;
}

/**
Expand All @@ -59,6 +97,26 @@ export interface ConnectionContextOptions extends EventHubClientOptions {
managementSessionAudience?: string;
}

/**
* Helper type to get the names of all the functions on an object.
*/
type FunctionPropertyNames<T> = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T];
/**
* Helper type to get the types of all the functions on an object.
*/
type FunctionProperties<T> = Pick<T, FunctionPropertyNames<T>>;
/**
* Helper type to get the types of all the functions on ConnectionContext
* and the internal methods from ConnectionContextInternalMembers.
* Note that this excludes the functions that ConnectionContext inherits.
* Each function also has its `this` type set as `ConnectionContext`.
*/
type ConnectionContextMethods = Omit<
FunctionProperties<ConnectionContextInternalMembers>,
FunctionPropertyNames<ConnectionContextBase>
> &
ThisType<ConnectionContextInternalMembers>;

/**
* @internal
* @ignore
Expand Down Expand Up @@ -118,6 +176,47 @@ export namespace ConnectionContext {
};
connectionContext.managementSession = new ManagementClient(connectionContext, mOptions);

let waitForDisconnectResolve: () => void;
let waitForDisconnectPromise: Promise<void> | undefined;

Object.assign<ConnectionContext, ConnectionContextMethods>(connectionContext, {
isConnectionClosing() {
// When the connection is not open, but the remote end is open,
// then the rhea connection is in the process of terminating.
return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen());
},
async readyToOpenLink() {
// Check that the connection isn't in the process of closing.
// This can happen when the idle timeout has been reached but
// the underlying socket is waiting to be destroyed.
if (this.isConnectionClosing()) {
// Wait for the disconnected event that indicates the underlying socket has closed.
await this.waitForDisconnectedEvent();
}
// Check if the connection is currently in the process of disconnecting.
if (waitForDisconnectPromise) {
// Wait for the connection to be reset.
await this.waitForConnectionReset();
}
},
waitForDisconnectedEvent() {
return new Promise((resolve) => {
logger.verbose(
`[${this.connectionId}] Attempting to reinitialize connection` +
` but the connection is in the process of closing.` +
` Waiting for the disconnect event before continuing.`
);
this.connection.once(ConnectionEvents.disconnected, resolve);
});
},
waitForConnectionReset() {
if (waitForDisconnectPromise) {
return waitForDisconnectPromise;
}
return Promise.resolve();
}
});

// Define listeners to be added to the connection object for
// "connection_open" and "connection_error" events.
const onConnectionOpen: OnAmqpEvent = () => {
Expand All @@ -129,7 +228,14 @@ export namespace ConnectionContext {
);
};

const disconnected: OnAmqpEvent = async (context: EventContext) => {
const onDisconnected: OnAmqpEvent = async (context: EventContext) => {
if (waitForDisconnectPromise) {
return;
}
waitForDisconnectPromise = new Promise((resolve) => {
waitForDisconnectResolve = resolve;
});

logger.verbose(
"[%s] 'disconnected' event occurred on the amqp connection.",
connectionContext.connection.id
Expand Down Expand Up @@ -169,39 +275,33 @@ export namespace ConnectionContext {
connectionContext.connection.removeAllSessions();

// Close the cbs session to ensure all the event handlers are released.
await connectionContext.cbsSession.close();
await connectionContext.cbsSession.close().catch(() => {
/* error already logged, swallow it here */
});
// Close the management session to ensure all the event handlers are released.
await connectionContext.managementSession!.close();
await connectionContext.managementSession!.close().catch(() => {
/* error already logged, swallow it here */
});

// Close all senders and receivers to ensure clean up of timers & other resources.
if (state.numSenders || state.numReceivers) {
for (const senderName of Object.keys(connectionContext.senders)) {
const sender = connectionContext.senders[senderName];
if (!sender.isConnecting) {
await sender.close().catch((err) => {
logger.verbose(
"[%s] Error when closing sender [%s] after disconnected event: %O",
connectionContext.connection.id,
senderName,
err
);
});
}
await sender.close().catch(() => {
/* error already logged, swallow it here */
});
}
for (const receiverName of Object.keys(connectionContext.receivers)) {
const receiver = connectionContext.receivers[receiverName];
if (!receiver.isConnecting) {
await receiver.close().catch((err) => {
logger.verbose(
"[%s] Error when closing sender [%s] after disconnected event: %O",
connectionContext.connection.id,
receiverName,
err
);
});
}
await receiver.close().catch(() => {
/* error already logged, swallow it here */
});
}
}

await refreshConnection(connectionContext);
waitForDisconnectResolve();
waitForDisconnectPromise = undefined;
};

const protocolError: OnAmqpEvent = async (context: EventContext) => {
Expand Down Expand Up @@ -248,11 +348,47 @@ export namespace ConnectionContext {
}
};

// Add listeners on the connection object.
connectionContext.connection.on(ConnectionEvents.connectionOpen, onConnectionOpen);
connectionContext.connection.on(ConnectionEvents.disconnected, disconnected);
connectionContext.connection.on(ConnectionEvents.protocolError, protocolError);
connectionContext.connection.on(ConnectionEvents.error, error);
function addConnectionListeners(connection: Connection) {
// Add listeners on the connection object.
connection.on(ConnectionEvents.connectionOpen, onConnectionOpen);
connection.on(ConnectionEvents.disconnected, onDisconnected);
connection.on(ConnectionEvents.protocolError, protocolError);
connection.on(ConnectionEvents.error, error);
}

function cleanConnectionContext(connectionContext: ConnectionContext) {
// Remove listeners from the connection object.
connectionContext.connection.removeListener(
ConnectionEvents.connectionOpen,
onConnectionOpen
);
connectionContext.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
connectionContext.connection.removeListener(ConnectionEvents.protocolError, protocolError);
connectionContext.connection.removeListener(ConnectionEvents.error, error);
// Close the connection
return connectionContext.connection.close();
}

async function refreshConnection(connectionContext: ConnectionContext) {
const originalConnectionId = connectionContext.connectionId;
try {
await cleanConnectionContext(connectionContext);
} catch (err) {
logger.verbose(
`[${connectionContext.connectionId}] There was an error closing the connection before reconnecting: %O`,
err
);
}

// Create a new connection, id, locks, and cbs client.
connectionContext.refreshConnection();
addConnectionListeners(connectionContext.connection);
logger.verbose(
`The connection "${originalConnectionId}" has been updated to "${connectionContext.connectionId}".`
);
}

addConnectionListeners(connectionContext.connection);

logger.verbose("[%s] Created connection context successfully.", connectionContext.connectionId);
return connectionContext;
Expand Down
24 changes: 17 additions & 7 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,22 @@ export class EventHubReceiver extends LinkEntity {
* @returns
*/
async close(): Promise<void> {
this.clearHandlers();
try {
this.clearHandlers();

if (!this._receiver) {
return;
}
if (!this._receiver) {
return;
}

const receiverLink = this._receiver;
this._deleteFromCache();
await this._closeLink(receiverLink);
const receiverLink = this._receiver;
this._deleteFromCache();
await this._closeLink(receiverLink);
} catch (err) {
const msg = `[${this._context.connectionId}] An error occurred while closing receiver ${this.name}: ${err}`;
logger.warning(msg);
logErrorStackTrace(err);
throw err;
}
}

/**
Expand Down Expand Up @@ -511,6 +518,9 @@ export class EventHubReceiver extends LinkEntity {
try {
if (!this.isOpen() && !this.isConnecting) {
this.isConnecting = true;

// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();

const receiverOptions: CreateReceiverOptions = {
Expand Down
28 changes: 19 additions & 9 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,22 @@ export class EventHubSender extends LinkEntity {
* @returns Promise<void>
*/
async close(): Promise<void> {
if (this._sender) {
logger.info(
"[%s] Closing the Sender for the entity '%s'.",
this._context.connectionId,
this._context.config.entityPath
);
const senderLink = this._sender;
this._deleteFromCache();
await this._closeLink(senderLink);
try {
if (this._sender) {
logger.info(
"[%s] Closing the Sender for the entity '%s'.",
this._context.connectionId,
this._context.config.entityPath
);
const senderLink = this._sender;
this._deleteFromCache();
await this._closeLink(senderLink);
}
} catch (err) {
const msg = `[${this._context.connectionId}] An error occurred while closing sender ${this.name}: ${err}`;
logger.warning(msg);
logErrorStackTrace(err);
throw err;
}
}

Expand Down Expand Up @@ -561,6 +568,9 @@ export class EventHubSender extends LinkEntity {
try {
if (!this.isOpen() && !this.isConnecting) {
this.isConnecting = true;

// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();

logger.verbose(
Expand Down
10 changes: 8 additions & 2 deletions sdk/eventhub/event-hubs/src/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,12 @@ export class ManagementClient extends LinkEntity {
*/
async close(): Promise<void> {
try {
// Always clear the timeout, as the isOpen check may report
// false without ever having cleared the timeout otherwise.
clearTimeout(this._tokenRenewalTimer as NodeJS.Timer);
if (this._isMgmtRequestResponseLinkOpen()) {
const mgmtLink = this._mgmtReqResLink;
this._mgmtReqResLink = undefined;
clearTimeout(this._tokenRenewalTimer as NodeJS.Timer);
await mgmtLink!.close();
logger.info("Successfully closed the management session.");
}
Expand All @@ -273,6 +275,8 @@ export class ManagementClient extends LinkEntity {
private async _init(): Promise<void> {
try {
if (!this._isMgmtRequestResponseLinkOpen()) {
// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();
const rxopt: ReceiverOptions = {
source: { address: this.address },
Expand All @@ -289,7 +293,9 @@ export class ManagementClient extends LinkEntity {
);
}
};
const sropt: SenderOptions = { target: { address: this.address } };
const sropt: SenderOptions = {
target: { address: this.address }
};
logger.verbose(
"[%s] Creating sender/receiver links on a session for $management endpoint with " +
"srOpts: %o, receiverOpts: %O.",
Expand Down
Loading

0 comments on commit b33d7df

Please sign in to comment.