Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: graceful shutdown #1994

Merged
merged 9 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion lib/connextclient/ConnextClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class ConnextClient extends SwapClient {
private outboundAmounts = new Map<string, number>();
private inboundAmounts = new Map<string, number>();

private pendingRequests = new Set<http.ClientRequest>();
private criticalRequestPaths = ['/hashlock-resolve', '/hashlock-transfer'];

/** The minimum incremental quantity that we may use for collateral requests. */
private static MIN_COLLATERAL_REQUEST_SIZES: { [key: string]: number | undefined } = {
ETH: 0.1 * 10 ** 8,
Expand Down Expand Up @@ -866,6 +869,16 @@ class ConnextClient extends SwapClient {
/** Connext client specific cleanup. */
protected disconnect = async () => {
this.setStatus(ClientStatus.Disconnected);

for (const req of this.pendingRequests) {
if (this.criticalRequestPaths.includes(req.path)) {
this.logger.warn(`critical request is pending: ${req.path}`);
continue;
}

this.logger.info(`aborting pending request: ${req.path}`);
req.destroy();
}
}

/**
Expand Down Expand Up @@ -893,7 +906,11 @@ class ConnextClient extends SwapClient {
}

this.logger.trace(`sending request to ${endpoint}${payloadStr ? `: ${payloadStr}` : ''}`);
const req = http.request(options, async (res) => {

let req: http.ClientRequest;
req = http.request(options, async (res) => {
this.pendingRequests.delete(req);

let err: XudError | undefined;
let body;
switch (res.statusCode) {
Expand Down Expand Up @@ -935,6 +952,7 @@ class ConnextClient extends SwapClient {
});

req.on('error', async (err: any) => {
this.pendingRequests.delete(req);
if (err.code === 'ECONNREFUSED') {
await this.disconnect();
}
Expand All @@ -945,7 +963,9 @@ class ConnextClient extends SwapClient {
if (payloadStr) {
req.write(payloadStr);
}

req.end();
this.pendingRequests.add(req);
});
}
}
Expand Down
11 changes: 8 additions & 3 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ class Peer extends EventEmitter {
}

this.status = PeerStatus.Closed;
this.revokeConnectionRetries();

if (this.socket) {
if (!this.socket.destroyed) {
Expand Down Expand Up @@ -521,6 +522,8 @@ class Peer extends EventEmitter {
this.connectionRetriesRevoked = false;

const connectViaProxy = () => {
this.socket = net.connect(torport, 'localhost');

const proxyOptions: SocksClientOptions = {
proxy: {
host: 'localhost',
Expand All @@ -532,11 +535,11 @@ class Peer extends EventEmitter {
host: this.address.host,
port: this.address.port,
},
existing_socket: this.socket,
};
SocksClient.createConnection(proxyOptions)
.then((info) => {
// a raw net.Socket that is established to the destination host through the given proxy server
this.socket = info.socket;
assert(this.socket === info.socket);
onConnect();
})
.catch(onError);
Expand Down Expand Up @@ -614,7 +617,9 @@ class Peer extends EventEmitter {
}

private initStall = (): void => {
assert(this.status !== PeerStatus.Closed);
if (this.status !== PeerStatus.Closed) {
return;
}
assert(!this.stallTimer);

this.stallTimer = setInterval(this.checkTimeout, Peer.STALL_INTERVAL);
Expand Down
20 changes: 14 additions & 6 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ class Pool extends EventEmitter {
if (this.loadingNodesPromise) {
await this.loadingNodesPromise;
}
await Promise.all([this.unlisten(), this.closePendingConnections(), this.closePeers()]);
await Promise.all([
this.unlisten(),
this.closePendingConnections(DisconnectionReason.Shutdown),
this.closePeers(DisconnectionReason.Shutdown)],
);
this.connected = false;
this.disconnecting = false;
}
Expand All @@ -303,13 +307,17 @@ class Pool extends EventEmitter {
this.logger.debug(`Verifying reachability of advertised address: ${externalAddress}`);
try {
const peer = new Peer(Logger.DISABLED_LOGGER, address, this.network);

this.pendingOutboundPeers.set(this.nodePubKey, peer);
await peer.beginOpen({
ownNodeState: this.nodeState,
ownNodeKey: this.nodeKey,
ownVersion: this.version,
expectedNodePubKey: this.nodePubKey,
torport: this.config.torport,
});
this.pendingOutboundPeers.delete(this.nodePubKey);

await peer.close();
assert.fail();
} catch (err) {
Expand Down Expand Up @@ -1002,21 +1010,21 @@ class Pool extends EventEmitter {
}
}

private closePeers = () => {
private closePeers = (reason?: DisconnectionReason) => {
const closePromises = [];
for (const peer of this.peers.values()) {
closePromises.push(peer.close(DisconnectionReason.Shutdown));
closePromises.push(peer.close(reason));
}
return Promise.all(closePromises);
}

private closePendingConnections = () => {
private closePendingConnections = (reason?: DisconnectionReason) => {
const closePromises = [];
for (const peer of this.pendingOutboundPeers.values()) {
closePromises.push(peer.close());
closePromises.push(peer.close(reason));
}
for (const peer of this.pendingInboundPeers) {
closePromises.push(peer.close());
closePromises.push(peer.close(reason));
}
return Promise.all(closePromises);
}
Expand Down
30 changes: 29 additions & 1 deletion test/jest/Connext.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,20 @@ jest.mock('http', () => {
statusCode: 404,
});
}

let errorCb: any;
return {
path: options.path,
write: jest.fn(),
on: jest.fn(),
end: jest.fn(),
on: jest.fn().mockImplementation((event, cb) => {
if (event === 'error') {
errorCb = cb;
}
}),
destroy: jest.fn().mockImplementation(() => {
errorCb();
}),
};
}),
};
Expand All @@ -55,6 +65,8 @@ describe('ConnextClient', () => {
logger.trace = jest.fn();
logger.error = jest.fn();
logger.debug = jest.fn();
logger.warn = jest.fn();
logger.info = jest.fn();
const currencyInstances = [
{
id: 'ETH',
Expand Down Expand Up @@ -427,4 +439,20 @@ describe('ConnextClient', () => {
expect(connext['sendRequest']).toHaveBeenCalledTimes(0);
});
});

describe('disconnect', () => {
it('aborts pending requests, except critical ones', async () => {
expect(connext['pendingRequests'].size).toEqual(0);

connext['sendRequest'](connext['criticalRequestPaths'][0], '', {});
connext['sendRequest']('/path1', '', {});
connext['sendRequest']('/path1', '', {});
connext['sendRequest']('/path2', '', {});
connext['sendRequest'](connext['criticalRequestPaths'][1], '', {});
expect(connext['pendingRequests'].size).toEqual(5);

connext['disconnect']();
expect(connext['pendingRequests'].size).toEqual(2);
});
});
});