Skip to content

Commit

Permalink
Merge pull request #718 from ExchangeUnion/shutdown-incoming-peer
Browse files Browse the repository at this point in the history
fix(p2p): terminate inbound peers on shutdown
  • Loading branch information
sangaman authored Dec 12, 2018
2 parents 295e7f9 + b9f312a commit ec086bf
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ class Pool extends EventEmitter {
/** The local handshake data to be sent to newly connected peers. */
public handshakeData!: HandshakeState;
/** A map of pub keys to nodes for which we have pending outgoing connections. */
private pendingOutgoingConnections = new Map<string, Peer>();
private pendingOutboundPeers = new Map<string, Peer>();
/** A set of peers for which we have pending incoming connections. */
private pendingInboundPeers = new Set<Peer>();
/** A collection of known nodes on the XU network. */
private nodes: NodeList;
/** A collection of opened, active peers. */
Expand Down Expand Up @@ -322,12 +324,12 @@ class Pool extends EventEmitter {
throw errors.NODE_IS_BANNED(nodePubKey);
} else if (this.peers.has(nodePubKey)) {
throw errors.NODE_ALREADY_CONNECTED(nodePubKey, address);
} else if (this.pendingOutgoingConnections.has(nodePubKey)) {
} else if (this.pendingOutboundPeers.has(nodePubKey)) {
throw errors.ALREADY_CONNECTING(nodePubKey);
}

const peer = new Peer(this.logger, address);
this.pendingOutgoingConnections.set(nodePubKey, peer);
this.pendingOutboundPeers.set(nodePubKey, peer);
await this.openPeer(peer, nodePubKey, retryConnecting);
return peer;
}
Expand Down Expand Up @@ -355,7 +357,6 @@ class Pool extends EventEmitter {
try {
await peer.open(this.handshakeData, nodePubKey, retryConnecting);
} catch (err) {

// we don't have `nodePubKey` for inbound connections, which might fail on handshake
const id = nodePubKey || addressUtils.toString(peer.address);
this.logger.warn(`could not open connection to peer (${id}): ${err.message}`);
Expand Down Expand Up @@ -458,7 +459,9 @@ class Pool extends EventEmitter {

private addInbound = async (socket: Socket) => {
const peer = Peer.fromInbound(socket, this.logger);
this.pendingInboundPeers.add(peer);
await this.tryOpenPeer(peer);
this.pendingInboundPeers.delete(peer);
}

private handleSocket = async (socket: Socket) => {
Expand Down Expand Up @@ -644,20 +647,20 @@ class Pool extends EventEmitter {

peer.once('open', async () => {
await this.handleOpen(peer);
this.pendingOutgoingConnections.delete(peer.nodePubKey!);
this.pendingOutboundPeers.delete(peer.nodePubKey!);
});

peer.once('close', async () => {
if (!peer.nodePubKey && peer.expectedNodePubKey) {
this.pendingOutgoingConnections.delete(peer.expectedNodePubKey);
this.pendingOutboundPeers.delete(peer.expectedNodePubKey);
}

if (!peer.active) {
return;
}

if (peer.nodePubKey) {
this.pendingOutgoingConnections.delete(peer.nodePubKey);
this.pendingOutboundPeers.delete(peer.nodePubKey);
this.peers.delete(peer.nodePubKey);
}
this.emit('peer.close', peer);
Expand Down Expand Up @@ -696,9 +699,12 @@ class Pool extends EventEmitter {
}

private closePendingConnections = () => {
for (const peer of this.pendingOutgoingConnections.values()) {
for (const peer of this.pendingOutboundPeers.values()) {
peer.close();
}
this.pendingInboundPeers.forEach((peer) => {
peer.close();
});
}

/**
Expand Down

0 comments on commit ec086bf

Please sign in to comment.