Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[event-hubs] fix uncaught TypeError #8884

Merged
merged 13 commits into from
Jun 4, 2020
5 changes: 5 additions & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## 5.2.1 (Unreleased)

- Fixes issue [#8584](https://github.com/Azure/azure-sdk-for-js/issues/8584)
where a `TypeError` was sometimes thrown as an uncaught exception.
chradek marked this conversation as resolved.
Show resolved Hide resolved
([PR 8884](https://github.com/Azure/azure-sdk-for-js/pull/8884))
- Fixes reconnection issues by creating a new connection object rather than re-using the existing one. ([PR 8884](https://github.com/Azure/azure-sdk-for-js/pull/8884))

### Tracing updates:

Tracing functionality is still in preview status and the APIs may have breaking
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
},
"dependencies": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-amqp": "^1.1.1",
"@azure/core-amqp": "^1.1.3",
"@azure/core-asynciterator-polyfill": "^1.0.0",
"@azure/core-tracing": "1.0.0-preview.8",
"@azure/logger": "^1.0.0",
Expand Down
197 changes: 168 additions & 29 deletions sdk/eventhub/event-hubs/src/connectionContext.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

/* eslint-disable @azure/azure-sdk/ts-no-namespaces */
/* eslint-disable no-inner-declarations */

import { logger } from "./log";
import { getRuntimeInfo } from "./util/runtimeInfo";
import { packageJsonInfo } from "./util/constants";
Expand All @@ -16,7 +19,7 @@ import {
} from "@azure/core-amqp";
import { ManagementClient, ManagementClientOptions } from "./managementClient";
import { EventHubClientOptions } from "./models/public";
import { ConnectionEvents, Dictionary, EventContext, OnAmqpEvent } from "rhea-promise";
import { Connection, ConnectionEvents, Dictionary, EventContext, OnAmqpEvent } from "rhea-promise";

/**
* @internal
Expand Down Expand Up @@ -48,6 +51,41 @@ export interface ConnectionContext extends ConnectionContextBase {
* the underlying amqp connection for the EventHub Client.
*/
managementSession?: ManagementClient;
/**
* Function returning a promise that resolves once the connectionContext is ready to open an AMQP link.
* ConnectionContext will be ready to open an AMQP link when:
* - The AMQP connection is already open on both sides.
* - The AMQP connection has been closed or disconnected. In this case, a new AMQP connection is expected
* to be created first.
* An AMQP link cannot be opened if the AMQP connection
* is in the process of closing or disconnecting.
*/
readyToOpenLink(): Promise<void>;
}

ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
/**
* Describes the members on the ConnectionContext that are only
* used by it internally.
* @ignore
* @internal
*/
export interface ConnectionContextInternalMembers extends ConnectionContext {
/**
* Indicates whether the connection is in the process of closing.
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
* When this returns `true`, a `disconnected` event will be received
* after the connection is closed.
*
*/
isConnectionClosing(): boolean;
/**
* Resolves once the context's connection emits a `disconnected` event.
*/
waitForDisconnectedEvent(): Promise<void>;
/**
* Resolves once the connection has finished being reset.
* Connections are reset as part of reacting to a `disconnected` event.
*/
waitForConnectionReset(): Promise<void>;
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -59,6 +97,26 @@ export interface ConnectionContextOptions extends EventHubClientOptions {
managementSessionAudience?: string;
}

/**
* Helper type to get the names of all the functions on an object.
*/
type FunctionPropertyNames<T> = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So...beautiful.....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish I could take credit but that's right out of the TypeScript handbook: https://www.typescriptlang.org/docs/handbook/advanced-types.html#distributive-conditional-types

/**
* Helper type to get the types of all the functions on an object.
*/
type FunctionProperties<T> = Pick<T, FunctionPropertyNames<T>>;
/**
* Helper type to get the types of all the functions on ConnectionContext
* and the internal methods from ConnectionContextInternalMembers.
* Note that this excludes the functions that ConnectionContext inherits.
* Each function also has its `this` type set as `ConnectionContext`.
*/
type ConnectionContextMethods = Omit<
FunctionProperties<ConnectionContextInternalMembers>,
FunctionPropertyNames<ConnectionContextBase>
> &
ThisType<ConnectionContextInternalMembers>;

/**
* @internal
* @ignore
Expand Down Expand Up @@ -118,6 +176,48 @@ export namespace ConnectionContext {
};
connectionContext.managementSession = new ManagementClient(connectionContext, mOptions);

let isDisconnecting = false;
let waitForDisconnectResolve: () => void;
let waitForDisconnectPromise: Promise<void> | undefined;

Object.assign<ConnectionContext, ConnectionContextMethods>(connectionContext, {
isConnectionClosing() {
// When the connection is not open, but the remote end is open,
// then the rhea connection is in the process of terminating.
return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen());
},
async readyToOpenLink() {
// Check that the connection isn't in the process of closing.
// This can happen when the idle timeout has been reached but
// the underlying socket is waiting to be destroyed.
if (this.isConnectionClosing()) {
// Wait for the disconnected event that indicates the underlying socket has closed.
await this.waitForDisconnectedEvent();
}
// Check if the connection is currently in the process of disconnecting.
if (isDisconnecting) {
// Wait for the connection to be reset.
await this.waitForConnectionReset();
}
},
waitForDisconnectedEvent() {
return new Promise((resolve) => {
logger.verbose(
`[${this.connectionId}] Attempting to reinitialize connection` +
` but the connection is in the process of closing.` +
` Waiting for the disconnect event before continuing.`
);
this.connection.once(ConnectionEvents.disconnected, resolve);
});
},
waitForConnectionReset() {
if (waitForDisconnectPromise) {
return waitForDisconnectPromise;
}
return Promise.resolve();
}
});

// Define listeners to be added to the connection object for
// "connection_open" and "connection_error" events.
const onConnectionOpen: OnAmqpEvent = () => {
Expand All @@ -129,7 +229,15 @@ export namespace ConnectionContext {
);
};

const disconnected: OnAmqpEvent = async (context: EventContext) => {
const onDisconnected: OnAmqpEvent = async (context: EventContext) => {
if (isDisconnecting) {
return;
}
isDisconnecting = true;
waitForDisconnectPromise = new Promise((resolve) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the isDisconnecting variable is basically the same as waitForDisconnectPromise != null. Worth it to remove and just have the one variable to track if we're disconnecting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah...I have a check in readyToOpenLink() that checks isDisconnecting, which just felt easier to understand than if (waitForDisconnectPromise) or if (Boolean(waitForDisconnectPromise)), but it might be better to have one source of truth.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

waitForDisconnectResolve = resolve;
});

logger.verbose(
"[%s] 'disconnected' event occurred on the amqp connection.",
connectionContext.connection.id
Expand Down Expand Up @@ -169,39 +277,34 @@ export namespace ConnectionContext {
connectionContext.connection.removeAllSessions();

// Close the cbs session to ensure all the event handlers are released.
await connectionContext.cbsSession.close();
await connectionContext.cbsSession.close().catch(() => {
/* error already logged, swallow it here */
});
// Close the management session to ensure all the event handlers are released.
await connectionContext.managementSession!.close();
await connectionContext.managementSession!.close().catch(() => {
/* error already logged, swallow it here */
});

// Close all senders and receivers to ensure clean up of timers & other resources.
if (state.numSenders || state.numReceivers) {
for (const senderName of Object.keys(connectionContext.senders)) {
const sender = connectionContext.senders[senderName];
if (!sender.isConnecting) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this if check removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible for disconnected to be emitted while connecting. For example, if the connection is viewed as being idle while the connection is still in the process of opening. This check was preventing us from closing the existing senders in this scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check is a remnant of the time when we recovered sender links when connection was disconnected. So the code below where we have sender.close() was previously sender.onDetached(). Since that need to be done if the sender is already connecting, we had this check. It should have been removed when we made the decision to not re-connect broken sender links in the background i.e. in #6737

await sender.close().catch((err) => {
logger.verbose(
"[%s] Error when closing sender [%s] after disconnected event: %O",
connectionContext.connection.id,
senderName,
err
);
});
}
await sender.close().catch(() => {
/* error already logged, swallow it here */
});
}
for (const receiverName of Object.keys(connectionContext.receivers)) {
const receiver = connectionContext.receivers[receiverName];
if (!receiver.isConnecting) {
await receiver.close().catch((err) => {
logger.verbose(
"[%s] Error when closing sender [%s] after disconnected event: %O",
connectionContext.connection.id,
receiverName,
err
);
});
}
await receiver.close().catch(() => {
/* error already logged, swallow it here */
});
}
}

await refreshConnection(connectionContext);
waitForDisconnectResolve();
waitForDisconnectPromise = undefined;
isDisconnecting = false;
};

const protocolError: OnAmqpEvent = async (context: EventContext) => {
Expand Down Expand Up @@ -248,11 +351,47 @@ export namespace ConnectionContext {
}
};

// Add listeners on the connection object.
connectionContext.connection.on(ConnectionEvents.connectionOpen, onConnectionOpen);
connectionContext.connection.on(ConnectionEvents.disconnected, disconnected);
connectionContext.connection.on(ConnectionEvents.protocolError, protocolError);
connectionContext.connection.on(ConnectionEvents.error, error);
function addConnectionListeners(connection: Connection) {
// Add listeners on the connection object.
connection.on(ConnectionEvents.connectionOpen, onConnectionOpen);
connection.on(ConnectionEvents.disconnected, onDisconnected);
connection.on(ConnectionEvents.protocolError, protocolError);
connection.on(ConnectionEvents.error, error);
}

function cleanConnectionContext(connectionContext: ConnectionContext) {
// Remove listeners from the connection object.
connectionContext.connection.removeListener(
ConnectionEvents.connectionOpen,
onConnectionOpen
);
connectionContext.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
connectionContext.connection.removeListener(ConnectionEvents.protocolError, protocolError);
connectionContext.connection.removeListener(ConnectionEvents.error, error);
// Close the connection
return connectionContext.connection.close();
}

async function refreshConnection(connectionContext: ConnectionContext) {
const originalConnectionId = connectionContext.connectionId;
try {
await cleanConnectionContext(connectionContext);
} catch (err) {
logger.verbose(
`[${connectionContext.connectionId}] There was an error closing the connection before reconnecting: %O`,
err
);
}

// Create a new connection, id, locks, and cbs client.
connectionContext.refreshConnection();
addConnectionListeners(connectionContext.connection);
logger.verbose(
`The connection "${originalConnectionId}" has been updated to "${connectionContext.connectionId}".`
);
}

addConnectionListeners(connectionContext.connection);

logger.verbose("[%s] Created connection context successfully.", connectionContext.connectionId);
return connectionContext;
Expand Down
24 changes: 17 additions & 7 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,22 @@ export class EventHubReceiver extends LinkEntity {
* @returns
*/
async close(): Promise<void> {
this.clearHandlers();
try {
this.clearHandlers();

if (!this._receiver) {
return;
}
if (!this._receiver) {
return;
}

const receiverLink = this._receiver;
this._deleteFromCache();
await this._closeLink(receiverLink);
const receiverLink = this._receiver;
this._deleteFromCache();
await this._closeLink(receiverLink);
} catch (err) {
const msg = `[${this._context.connectionId}] An error occurred while closing receiver ${this.name}: ${err}`;
logger.warning(msg);
logErrorStackTrace(err);
throw err;
}
}

/**
Expand Down Expand Up @@ -511,6 +518,9 @@ export class EventHubReceiver extends LinkEntity {
try {
if (!this.isOpen() && !this.isConnecting) {
this.isConnecting = true;

// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();

const receiverOptions: CreateReceiverOptions = {
Expand Down
28 changes: 19 additions & 9 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,22 @@ export class EventHubSender extends LinkEntity {
* @returns Promise<void>
*/
async close(): Promise<void> {
if (this._sender) {
logger.info(
"[%s] Closing the Sender for the entity '%s'.",
this._context.connectionId,
this._context.config.entityPath
);
const senderLink = this._sender;
this._deleteFromCache();
await this._closeLink(senderLink);
try {
if (this._sender) {
logger.info(
"[%s] Closing the Sender for the entity '%s'.",
this._context.connectionId,
this._context.config.entityPath
);
const senderLink = this._sender;
this._deleteFromCache();
await this._closeLink(senderLink);
}
} catch (err) {
const msg = `[${this._context.connectionId}] An error occurred while closing sender ${this.name}: ${err}`;
logger.warning(msg);
logErrorStackTrace(err);
throw err;
}
}

Expand Down Expand Up @@ -561,6 +568,9 @@ export class EventHubSender extends LinkEntity {
try {
if (!this.isOpen() && !this.isConnecting) {
this.isConnecting = true;

// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();

logger.verbose(
Expand Down
Loading