diff --git a/README.md b/README.md index 11953dc9f1..3f78bb7876 100644 --- a/README.md +++ b/README.md @@ -26,16 +26,17 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca - [Usage](#usage) - [Create a libp2p switch](#create-a-libp2p-switch) - [API](#api) - - [`switch.dial(peer, protocol, callback)`](#swarmdialpi-protocol-callback) - - [`switch.hangUp(peer, callback)`](#swarmhanguppi-callback) - - [`switch.handle(protocol, handler)`](#swarmhandleprotocol-handler) - - [`switch.unhandle(protocol)`](#swarmunhandleprotocol) - - [`switch.start(callback)`](#swarmlistencallback) - - [`switch.stop(callback)`](#swarmclosecallback) - - [`switch.connection`](#connection) + - [`switch.connection`](#switchconnection) + - [`switch.dial(peer, protocol, callback)`](#switchdialpeer-protocol-callback) + - [`switch.dialFSM(peer, protocol, callback)`](#switchdialfsmpeer-protocol-callback) + - [`switch.handle(protocol, handlerFunc, matchFunc)`](#switchhandleprotocol-handlerfunc-matchfunc) + - [`switch.hangUp(peer, callback)`](#switchhanguppeer-callback) + - [`switch.start(callback)`](#switchstartcallback) + - [`switch.stop(callback)`](#switchstopcallback) - [`switch.stats`](#stats-api) - - [Internal Transports API](#transports) -- [Design Notes](#designnotes) + - [`switch.unhandle(protocol)`](#switchunhandleprotocol) + - [Internal Transports API](#internal-transports-api) +- [Design Notes](#design-notes) - [Multitransport](#multitransport) - [Connection upgrades](#connection-upgrades) - [Identify](#identify) @@ -86,6 +87,46 @@ tests]([./test/pnet.node.js]). - peerInfo is a [PeerInfo](https://github.com/libp2p/js-peer-info) object that has the peer information. - peerBook is a [PeerBook](https://github.com/libp2p/js-peer-book) object that stores all the known peers. +### `switch.connection` + +##### `switch.connection.addUpgrade()` + +A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification. + +> **WIP** + +##### `switch.connection.addStreamMuxer(muxer)` + +Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/libp2p/interface-stream-muxer) spec. + +- `muxer` + +##### `switch.connection.reuse()` + +Enable the identify protocol. + +##### `switch.connection.crypto([tag, encrypt])` + +Enable a specified crypto protocol. By default no encryption is used, aka `plaintext`. If called with no arguments it resets to use `plaintext`. + +You can use for example [libp2p-secio](https://github.com/libp2p/js-libp2p-secio) like this + +```js +const secio = require('libp2p-secio') +switch.connection.crypto(secio.tag, secio.encrypt) +``` + +##### `switch.connection.enableCircuitRelay(options, callback)` + +Enable circuit relaying. + +- `options` + - enabled - activates relay dialing and listening functionality + - hop - an object with two properties + - enabled - enables circuit relaying + - active - is it an active or passive relay (default false) +- `callback` + ### `switch.dial(peer, protocol, callback)` dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point where we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, then do nothing. @@ -94,13 +135,24 @@ dial uses the best transport (whatever works first, in the future we can have so - `protocol` - `callback` -### `switch.hangUp(peer, callback)` +### `switch.dialFSM(peer, protocol, callback)` -Hang up the muxed connection we have with the peer. +works like dial, but calls back with a [Connection State Machine](#connection-state-machine) - `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][] -- `callback` +- `protocol`: String that defines the protocol (e.g '/ipfs/bitswap/1.1.0') to be used +- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine) +#### Connection State Machine +Connection state machines emit a number of events that can be used to determine the current state of the connection +and to received the underlying connection that can be used to transfer data. + +##### Events +- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted. +- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal. +- `error:connection_attempt_failed`: emitted whenever a dial attempt fails for a given transport. An array of errors is emitted. +- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/interface-connection) will be emitted. +- `close`: emitted when the connection has closed. ### `switch.handle(protocol, handlerFunc, matchFunc)` @@ -110,68 +162,43 @@ Handle a new protocol. - `handlerFunc` - function called when we receive a dial on `protocol. Signature must be `function (protocol, conn) {}` - `matchFunc` - matchFunc for multistream-select -### `switch.unhandle(protocol)` - -Unhandle a protocol. - -- `protocol` - -### `switch.on('peer-mux-established', (peer) => {})` - -- `peer`: is instance of [PeerInfo][] that has info of the peer we have just established a muxed connection with. - -### `switch.on('peer-mux-closed', (peer) => {})` - -- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection. - -### `switch.start(callback)` - -Start listening on all added transports that are available on the current `peerInfo`. - -### `switch.stop(callback)` +### `switch.hangUp(peer, callback)` -Close all the listeners and muxers. +Hang up the muxed connection we have with the peer. +- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][] - `callback` -### `switch.connection` +### `switch.on('error', (err) => {})` -##### `switch.connection.addUpgrade()` +Emitted when the switch encounters an error. -A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification. +- `err`: instance of [Error][] -> **WIP** +### `switch.on('peer-mux-established', (peer) => {})` -##### `switch.connection.addStreamMuxer(muxer)` +- `peer`: is instance of [PeerInfo][] that has info of the peer we have just established a muxed connection with. -Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/libp2p/interface-stream-muxer) spec. +### `switch.on('peer-mux-closed', (peer) => {})` -- `muxer` +- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection. -##### `switch.connection.reuse()` +### `switch.on('start', () => {})` -Enable the identify protocol. +Emitted when the switch has successfully started. -##### `switch.connection.crypto([tag, encrypt])` +### `switch.on('stop', () => {})` -Enable a specified crypto protocol. By default no encryption is used, aka `plaintext`. If called with no arguments it resets to use `plaintext`. +Emitted when the switch has successfully stopped. -You can use for example [libp2p-secio](https://github.com/libp2p/js-libp2p-secio) like this +### `switch.start(callback)` -```js -const secio = require('libp2p-secio') -switch.connection.crypto(secio.tag, secio.encrypt) -``` +Start listening on all added transports that are available on the current `peerInfo`. -##### `switch.connection.enableCircuitRelay(options, callback)` +### `switch.stop(callback)` -Enable circuit relaying. +Close all the listeners and muxers. -- `options` - - enabled - activates relay dialing and listening functionality - - hop - an object with two properties - - enabled - enables circuit relaying - - active - is it an active or passive relay (default false) - `callback` ### Stats API @@ -278,6 +305,11 @@ Each one of these values is [an exponential moving-average instance](https://git Stats are not updated in real-time. Instead, measurements are buffered and stats are updated at an interval. The maximum interval can be defined through the `Switch` constructor option `stats.computeThrottleTimeout`, defined in miliseconds. +### `switch.unhandle(protocol)` + +Unhandle a protocol. + +- `protocol` ### Internal Transports API diff --git a/package.json b/package.json index 278cd530b0..4ca5dabef6 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "devDependencies": { "aegir": "^15.1.0", "chai": "^4.1.2", + "chai-checkmark": "^1.0.1", "dirty-chai": "^2.0.1", "libp2p-mplex": "~0.8.2", "libp2p-pnet": "~0.1.0", @@ -54,7 +55,10 @@ "dependencies": { "async": "^2.6.1", "big.js": "^5.1.2", + "class-is": "^1.1.0", "debug": "^3.1.0", + "err-code": "^1.1.2", + "fsm-event": "^2.1.0", "hashlru": "^2.2.1", "interface-connection": "~0.3.2", "ip-address": "^5.8.9", diff --git a/src/connection/base.js b/src/connection/base.js new file mode 100644 index 0000000000..485bd7d202 --- /dev/null +++ b/src/connection/base.js @@ -0,0 +1,103 @@ +'use strict' + +const EventEmitter = require('events').EventEmitter +const debug = require('debug') +const withIs = require('class-is') + +class BaseConnection extends EventEmitter { + constructor ({ _switch, name }) { + super() + + this.switch = _switch + this.ourPeerInfo = this.switch._peerInfo + this.log = debug(`libp2p:conn:${name}`) + } + + /** + * Gets the current state of the connection + * + * @returns {string} The current state of the connection + */ + getState () { + return this._state._state + } + + /** + * Puts the state into encrypting mode + * + * @returns {void} + */ + encrypt () { + this._state('encrypt') + } + + /** + * Puts the state into privatizing mode + * + * @returns {void} + */ + protect () { + this._state('privatize') + } + + /** + * Puts the state into muxing mode + * + * @returns {void} + */ + upgrade () { + this._state('upgrade') + } + + /** + * Event handler for disconnected. + * + * @fires BaseConnection#close + * @returns {void} + */ + _onDisconnected () { + this.log(`disconnected from ${this.theirB58Id}`) + this.emit('close') + this.removeAllListeners() + } + + /** + * Event handler for privatized + * + * @fires BaseConnection#private + * @returns {void} + */ + _onPrivatized () { + this.log(`successfully privatized incoming connection`) + this.emit('private', this.conn) + } + + /** + * Wraps this.conn with the Switch.protector for private connections + * + * @private + * @fires ConnectionFSM#error + * @returns {void} + */ + _onPrivatizing () { + if (!this.switch.protector) { + return this._state('done') + } + + this.conn = this.switch.protector.protect(this.conn, (err) => { + if (err) { + this.emit('error', err) + return this._state('disconnect') + } + + this.log(`successfully privatized conn to ${this.theirB58Id}`) + this.conn.setPeerInfo(this.theirPeerInfo) + this._state('done') + }) + } +} + +module.exports = withIs(BaseConnection, { + className: 'BaseConnection', + symbolName: 'libp2p-switch/BaseConnection' +}) diff --git a/src/connection/handler.js b/src/connection/handler.js new file mode 100644 index 0000000000..9443167f2d --- /dev/null +++ b/src/connection/handler.js @@ -0,0 +1,46 @@ +'use strict' + +const debug = require('debug') +const IncomingConnection = require('./incoming') +const observeConn = require('../observe-connection') + +function listener (_switch) { + const log = debug(`libp2p:switch:listener`) + + /** + * Takes a transport key and returns a connection handler function + * + * @param {string} transportKey The key of the transport to handle connections for + * @param {function} handler A custom handler to use + * @returns {function(Connection)} A connection handler function + */ + return (transportKey, handler) => { + /** + * Takes a base connection and manages listening behavior + * + * @param {Connection} conn The connection to manage + * @returns {void} + */ + return (conn) => { + // Add a transport level observer, if needed + const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn + + log('received incoming connection') + const connFSM = new IncomingConnection({ connection, _switch, transportKey }) + + connFSM.once('error', (err) => log(err)) + connFSM.once('private', (_conn) => { + // Use the custom handler, if it was provided + if (handler) { + return handler(_conn) + } + connFSM.encrypt() + }) + connFSM.once('encrypted', () => connFSM.upgrade()) + + connFSM.protect() + } + } +} + +module.exports = listener diff --git a/src/connection/incoming.js b/src/connection/incoming.js new file mode 100644 index 0000000000..21c0e124e4 --- /dev/null +++ b/src/connection/incoming.js @@ -0,0 +1,115 @@ +'use strict' + +const FSM = require('fsm-event') +const multistream = require('multistream-select') +const withIs = require('class-is') + +const BaseConnection = require('./base') + +class IncomingConnectionFSM extends BaseConnection { + constructor ({ connection, _switch, transportKey }) { + super({ + _switch, + name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}` + }) + this.conn = connection + this.theirPeerInfo = null + this.ourPeerInfo = this.switch._peerInfo + this.transportKey = transportKey + this.protocolMuxer = this.switch.protocolMuxer(this.transportKey) + this.msListener = new multistream.Listener() + + this._state = FSM('DIALED', { + DISCONNECTED: { }, + DIALED: { // Base connection to peer established + privatize: 'PRIVATIZING', + encrypt: 'ENCRYPTING' + }, + PRIVATIZING: { // Protecting the base connection + done: 'PRIVATIZED', + disconnect: 'DISCONNECTING' + }, + PRIVATIZED: { // Base connection is protected + encrypt: 'ENCRYPTING' + }, + ENCRYPTING: { // Encrypting the base connection + done: 'ENCRYPTED', + disconnect: 'DISCONNECTING' + }, + ENCRYPTED: { // Upgrading could not happen, the connection is encrypted and waiting + upgrade: 'UPGRADING', + disconnect: 'DISCONNECTING' + }, + UPGRADING: { // Attempting to upgrade the connection with muxers + done: 'MUXED' + }, + MUXED: { + disconnect: 'DISCONNECTING' + }, + DISCONNECTING: { // Shutting down the connection + done: 'DISCONNECTED' + } + }) + + this._state.on('PRIVATIZING', () => this._onPrivatizing()) + this._state.on('PRIVATIZED', () => this._onPrivatized()) + this._state.on('ENCRYPTING', () => this._onEncrypting()) + this._state.on('ENCRYPTED', () => { + this.log(`successfully encrypted connection to ${this.theirB58Id || 'unknown peer'}`) + this.emit('encrypted', this.conn) + }) + this._state.on('UPGRADING', () => this._onUpgrading()) + this._state.on('MUXED', () => { + this.log(`successfully muxed connection to ${this.theirB58Id || 'unknown peer'}`) + this.emit('muxed', this.conn) + }) + this._state.on('DISCONNECTING', () => { + if (this.theirPeerInfo) { + this.theirPeerInfo.disconnect() + } + this._state('done') + }) + } + + /** + * Attempts to encrypt `this.conn` with the Switch's crypto. + * + * @private + * @fires IncomingConnectionFSM#error + * @returns {void} + */ + _onEncrypting () { + this.log(`encrypting connection via ${this.switch.crypto.tag}`) + + this.msListener.addHandler(this.switch.crypto.tag, (protocol, _conn) => { + this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, _conn, undefined, (err) => { + if (err) { + this.emit('error', err) + return this._state('disconnect') + } + this.conn.getPeerInfo((_, peerInfo) => { + this.theirPeerInfo = peerInfo + this._state('done') + }) + }) + }, null) + + // Start handling the connection + this.msListener.handle(this.conn, (err) => { + if (err) { + this.emit('crypto handshaking failed', err) + } + }) + } + + _onUpgrading () { + this.log('adding the protocol muxer to the connection') + this.protocolMuxer(this.conn, this.msListener) + this._state('done') + } +} + +module.exports = withIs(IncomingConnectionFSM, { + className: 'IncomingConnectionFSM', + symbolName: 'libp2p-switch/IncomingConnectionFSM' +}) diff --git a/src/connection/index.js b/src/connection/index.js new file mode 100644 index 0000000000..01a6ec9c16 --- /dev/null +++ b/src/connection/index.js @@ -0,0 +1,460 @@ +'use strict' + +const FSM = require('fsm-event') +const setImmediate = require('async/setImmediate') +const Circuit = require('libp2p-circuit') +const multistream = require('multistream-select') +const withIs = require('class-is') +const BaseConnection = require('./base') + +const observeConnection = require('../observe-connection') +const Errors = require('../errors') + +/** + * @typedef {Object} ConnectionOptions + * @property {Switch} _switch Our switch instance + * @property {PeerInfo} peerInfo The PeerInfo of the peer to dial + * @property {Muxer} muxer Optional - A muxed connection + */ + +/** + * ConnectionFSM handles the complex logic of managing a connection + * between peers. ConnectionFSM is internally composed of a state machine + * to help improve the usability and debuggability of connections. The + * state machine also helps to improve the ability to handle dial backoff, + * coalescing dials and dial locks. + */ +class ConnectionFSM extends BaseConnection { + /** + * @param {ConnectionOptions} param0 + * @constructor + */ + constructor ({ _switch, peerInfo, muxer }) { + super({ + _switch, + name: `out:${_switch._peerInfo.id.toB58String().slice(0, 8)}` + }) + + this.theirPeerInfo = peerInfo + this.theirB58Id = this.theirPeerInfo.id.toB58String() + + this.conn = null // The base connection + this.muxer = muxer // The upgraded/muxed connection + + let startState = 'DISCONNECTED' + if (this.muxer) { + startState = 'MUXED' + } + + this._state = FSM(startState, { + DISCONNECTED: { // No active connections exist for the peer + dial: 'DIALING', + disconnect: 'DISCONNECTED', + done: 'DISCONNECTED' + }, + DIALING: { // Creating an initial connection + abort: 'ABORTED', + // emit events for different transport dials? + done: 'DIALED', + error: 'ERRORED', + disconnect: 'DISCONNECTING' + }, + DIALED: { // Base connection to peer established + encrypt: 'ENCRYPTING', + privatize: 'PRIVATIZING' + }, + PRIVATIZING: { // Protecting the base connection + done: 'PRIVATIZED', + abort: 'ABORTED', + disconnect: 'DISCONNECTING' + }, + PRIVATIZED: { // Base connection is protected + encrypt: 'ENCRYPTING' + }, + ENCRYPTING: { // Encrypting the base connection + done: 'ENCRYPTED', + error: 'ERRORED', + disconnect: 'DISCONNECTING' + }, + ENCRYPTED: { // Upgrading could not happen, the connection is encrypted and waiting + upgrade: 'UPGRADING', + disconnect: 'DISCONNECTING' + }, + UPGRADING: { // Attempting to upgrade the connection with muxers + stop: 'CONNECTED', // If we cannot mux, stop upgrading + done: 'MUXED', + error: 'ERRORED' + }, + MUXED: { + disconnect: 'DISCONNECTING' + }, + CONNECTED: { // A non muxed connection is established + disconnect: 'DISCONNECTING' + }, + DISCONNECTING: { // Shutting down the connection + done: 'DISCONNECTED', + disconnect: 'DISCONNECTING' + }, + ABORTED: { }, // A severe event occurred + ERRORED: { // An error occurred, but future dials may be allowed + disconnect: 'DISCONNECTING' // There could be multiple options here, but this is a likely action + } + }) + + this._state.on('DISCONNECTED', () => this._onDisconnected()) + this._state.on('DIALING', () => this._onDialing()) + this._state.on('DIALED', () => this._onDialed()) + this._state.on('PRIVATIZING', () => this._onPrivatizing()) + this._state.on('PRIVATIZED', () => this._onPrivatized()) + this._state.on('ENCRYPTING', () => this._onEncrypting()) + this._state.on('ENCRYPTED', () => { + this.log(`successfully encrypted connection to ${this.theirB58Id}`) + this.emit('encrypted', this.conn) + }) + this._state.on('UPGRADING', () => this._onUpgrading()) + this._state.on('MUXED', () => { + this.log(`successfully muxed connection to ${this.theirB58Id}`) + this.emit('muxed', this.muxer) + }) + this._state.on('CONNECTED', () => { + this.log(`unmuxed connection opened to ${this.theirB58Id}`) + this.emit('unmuxed', this.conn) + }) + this._state.on('DISCONNECTING', () => this._onDisconnecting()) + this._state.on('ABORTED', () => this._onAborted()) + this._state.on('ERRORED', () => this._onErrored()) + this._state.on('error', (err) => this._onStateError(err)) + } + + /** + * Puts the state into its disconnecting flow + * + * @returns {void} + */ + close () { + this.log(`closing connection to ${this.theirB58Id}`) + this._state('disconnect') + } + + /** + * Puts the state into dialing mode + * + * @fires ConnectionFSM#Error May emit a DIAL_SELF error + * @returns {void} + */ + dial () { + if (this.theirB58Id === this.ourPeerInfo.id.toB58String()) { + return this.emit('error', Errors.DIAL_SELF()) + } + + this._state('dial') + } + + /** + * Initiates a handshake for the given protocol + * + * @param {string} protocol The protocol to negotiate + * @param {function(Error, Connection)} callback + * @returns {void} + */ + shake (protocol, callback) { + // If there is no protocol set yet, don't perform the handshake + if (!protocol) { + return callback(null, null) + } + + if (this.muxer && this.muxer.newStream) { + return this.muxer.newStream((err, stream) => { + if (err) { + return callback(err, null) + } + + this.log(`created new stream to ${this.theirB58Id}`) + this._protocolHandshake(protocol, stream, callback) + }) + } + + this.conn.setPeerInfo(this.theirPeerInfo) + this._protocolHandshake(protocol, this.conn, callback) + } + + /** + * Puts the state into muxing mode + * + * @returns {void} + */ + upgrade () { + this._state('upgrade') + } + + /** + * Event handler for dialing. Transitions state when successful. + * + * @private + * @fires ConnectionFSM#error + * @returns {void} + */ + _onDialing () { + this.log(`dialing ${this.theirB58Id}`) + + if (!this.switch.hasTransports()) { + this.emit('error', Errors.NO_TRANSPORTS_REGISTERED()) + return this._state('disconnect') + } + + const tKeys = this.switch.availableTransports(this.theirPeerInfo) + + const circuitEnabled = Boolean(this.switch.transports[Circuit.tag]) + let circuitTried = false + + const nextTransport = (key) => { + let transport = key + if (!transport) { + if (!circuitEnabled) { + this.emit('error', Errors.CONNECTION_FAILED( + new Error(`Circuit not enabled and all transports failed to dial peer ${this.theirB58Id}!`) + )) + return this._state('disconnect') + } + + if (circuitTried) { + this.emit('error', Errors.CONNECTION_FAILED( + new Error(`No available transports to dial peer ${this.theirB58Id}!`) + )) + return this._state('disconnect') + } + + this.log(`Falling back to dialing over circuit`) + this.theirPeerInfo.multiaddrs.add(`/p2p-circuit/ipfs/${this.theirB58Id}`) + circuitTried = true + transport = Circuit.tag + } + + this.log(`dialing transport ${transport}`) + this.switch.transport.dial(transport, this.theirPeerInfo, (err, _conn) => { + if (err) { + this.emit('error:connection_attempt_failed', err.errors || [err]) + this.log(err) + return nextTransport(tKeys.shift()) + } + + this.conn = observeConnection(transport, null, _conn, this.switch.observer) + this._state('done') + }) + } + + nextTransport(tKeys.shift()) + } + + /** + * Once a connection has been successfully dialed, the connection + * will be privatized or encrypted depending on the presence of the + * Switch.protector. + * + * @returns {void} + */ + _onDialed () { + this.log(`successfully dialed ${this.theirB58Id}`) + + this.emit('connected', this.conn) + } + + /** + * Event handler for disconnecting. Handles any needed cleanup + * + * @returns {void} + */ + _onDisconnecting () { + this.log(`disconnecting from ${this.theirB58Id}`) + + // Issue disconnects on both Peers + if (this.theirPeerInfo) { + this.theirPeerInfo.disconnect() + } + + // Clean up stored connections + if (this.muxer) { + this.muxer.end() + } + + delete this.switch.muxedConns[this.theirB58Id] + delete this.switch.conns[this.theirB58Id] + delete this.muxer + delete this.conn + + this._state('done') + + setImmediate(() => this.switch.emit('peer-mux-closed', this.theirPeerInfo)) + } + + /** + * Attempts to encrypt `this.conn` with the Switch's crypto. + * + * @private + * @fires ConnectionFSM#error + * @returns {void} + */ + _onEncrypting () { + const msDialer = new multistream.Dialer() + msDialer.handle(this.conn, (err) => { + if (err) { + this.emit('error', Errors.maybeUnexpectedEnd(err)) + return this._state('disconnect') + } + + this.log('selecting crypto %s to %s', this.switch.crypto.tag, this.theirB58Id) + + msDialer.select(this.switch.crypto.tag, (err, _conn) => { + if (err) { + this._state('disconnect') + return this.emit('error', Errors.maybeUnexpectedEnd(err)) + } + + const conn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer) + + this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, conn, this.theirPeerInfo.id, (err) => { + if (err) { + this._state('disconnect') + return this.emit('error', err) + } + + this.conn.setPeerInfo(this.theirPeerInfo) + this._state('done') + }) + }) + }) + } + + /** + * Iterates over each Muxer on the Switch and attempts to upgrade + * the given `connection`. Successful muxed connections will be stored + * on the Switch.muxedConns with `b58Id` as their key for future reference. + * + * @private + * @returns {void} + */ + _onUpgrading () { + const muxers = Object.keys(this.switch.muxers) + this.log(`upgrading connection to ${this.theirB58Id}`) + + if (muxers.length === 0) { + return this._state('stop') + } + + const msDialer = new multistream.Dialer() + msDialer.handle(this.conn, (err) => { + if (err) { + return this._didUpgrade(err) + } + + // 1. try to handshake in one of the muxers available + // 2. if succeeds + // - add the muxedConn to the list of muxedConns + // - add incomming new streams to connHandler + const nextMuxer = (key) => { + this.log('selecting %s', key) + msDialer.select(key, (err, _conn) => { + if (err) { + if (muxers.length === 0) { + return this._didUpgrade(err) + } + + return nextMuxer(muxers.shift()) + } + + // observe muxed connections + const conn = observeConnection(null, key, _conn, this.switch.observer) + + this.muxer = this.switch.muxers[key].dialer(conn) + this.switch.muxedConns[this.theirB58Id] = this + + this.muxer.once('close', () => { + delete this.muxer + this._state('disconnect') + }) + + // For incoming streams, in case identify is on + this.muxer.on('stream', (conn) => { + this.log(`new stream created via muxer to ${this.theirB58Id}`) + conn.setPeerInfo(this.theirPeerInfo) + this.switch.protocolMuxer(null)(conn) + }) + + setImmediate(() => this.switch.emit('peer-mux-established', this.theirPeerInfo)) + + this._didUpgrade(null) + }) + } + + nextMuxer(muxers.shift()) + }) + } + + /** + * Analyses the given error, if it exists, to determine where the state machine + * needs to go. + * + * @param {Error} err + * @returns {void} + */ + _didUpgrade (err) { + if (err) { + this.log('Error upgrading connection:', err) + this.switch.conns[this.theirB58Id] = this + this.emit('error:upgrade_failed', err) + // Cant upgrade, hold the encrypted connection + return this._state('stop') + } + + // move the state machine forward + this._state('done') + } + + /** + * Performs the protocol handshake for the given protocol + * over the given connection. The resulting error or connection + * will be returned via the callback. + * + * @private + * @param {string} protocol + * @param {Connection} connection + * @param {function(Error, Connection)} callback + * @returns {void} + */ + _protocolHandshake (protocol, connection, callback) { + const msDialer = new multistream.Dialer() + msDialer.handle(connection, (err) => { + if (err) { + return callback(err, null) + } + + msDialer.select(protocol, (err, _conn) => { + if (err) { + this.log(`could not perform protocol handshake: `, err) + return callback(err, null) + } + + const conn = observeConnection(null, protocol, _conn, this.switch.observer) + this.log(`successfully performed handshake of ${protocol} to ${this.theirB58Id}`) + this.emit('connection', conn) + callback(null, conn) + }) + }) + } + + /** + * Event handler for state transition errors + * + * @param {Error} err + * @returns {void} + */ + _onStateError (err) { + this.emit('error', Errors.INVALID_STATE_TRANSITION(err)) + this.log(err) + } +} + +module.exports = withIs(ConnectionFSM, { + className: 'ConnectionFSM', + symbolName: 'libp2p-switch/ConnectionFSM' +}) diff --git a/src/connection.js b/src/connection/manager.js similarity index 90% rename from src/connection.js rename to src/connection/manager.js index 395bae66c9..024c187936 100644 --- a/src/connection.js +++ b/src/connection/manager.js @@ -4,13 +4,14 @@ const identify = require('libp2p-identify') const multistream = require('multistream-select') const waterfall = require('async/waterfall') const debug = require('debug') -const log = debug('libp2p:switch:connection') +const log = debug('libp2p:switch:conn-manager') const once = require('once') const setImmediate = require('async/setImmediate') +const ConnectionFSM = require('../connection') const Circuit = require('libp2p-circuit') -const plaintext = require('./plaintext') +const plaintext = require('../plaintext') /** * Contains methods for binding handlers to the Switch @@ -89,7 +90,11 @@ class ConnectionManager { } const b58Str = peerInfo.id.toB58String() - this.switch.muxedConns[b58Str] = { muxer: muxedConn } + this.switch.muxedConns[b58Str] = new ConnectionFSM({ + _switch: this.switch, + peerInfo, + muxer: muxedConn + }) if (peerInfo.multiaddrs.size > 0) { // with incomming conn and through identify, going to pick one @@ -104,7 +109,7 @@ class ConnectionManager { } peerInfo = this.switch._peerBook.put(peerInfo) - muxedConn.on('close', () => { + muxedConn.once('close', () => { delete this.switch.muxedConns[b58Str] peerInfo.disconnect() peerInfo = this.switch._peerBook.put(peerInfo) @@ -123,7 +128,7 @@ class ConnectionManager { /** * Adds the `encrypt` handler for the given `tag` and also sets the - * Switch's crypto to past `encrypt` function + * Switch's crypto to passed `encrypt` function * * @param {String} tag * @param {function(PeerID, Connection, PeerId, Callback)} encrypt @@ -135,14 +140,6 @@ class ConnectionManager { encrypt = plaintext.encrypt } - this.switch.unhandle(this.switch.crypto.tag) - this.switch.handle(tag, (protocol, conn) => { - const myId = this.switch._peerInfo.id - const secure = encrypt(myId, conn, undefined, () => { - this.switch.protocolMuxer(null)(secure) - }) - }) - this.switch.crypto = {tag, encrypt} } diff --git a/src/dial.js b/src/dial.js deleted file mode 100644 index 8f24e32e91..0000000000 --- a/src/dial.js +++ /dev/null @@ -1,493 +0,0 @@ -'use strict' - -const multistream = require('multistream-select') -const Connection = require('interface-connection').Connection -const setImmediate = require('async/setImmediate') -const Circuit = require('libp2p-circuit') -const waterfall = require('async/waterfall') - -const debug = require('debug') -const log = debug('libp2p:switch:dial') - -const getPeerInfo = require('./get-peer-info') -const observeConnection = require('./observe-connection') -const UNEXPECTED_END = 'Unexpected end of input from reader.' - -/** - * Uses the given MultistreamDialer to select the protocol matching the given key - * - * A helper method to catch errors from pull streams ending unexpectedly - * Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged. - * - * @param {MultistreamDialer} msDialer a multistream.Dialer - * @param {string} key The key type to select - * @param {function(Error)} callback Used for standard async flow - * @param {function(Error)} abort A callback to be used for ending the connection outright - * @returns {void} - */ -function selectSafe (msDialer, key, callback, abort) { - msDialer.select(key, (err, conn) => { - if (err === true) { - return abort(new Error(UNEXPECTED_END)) - } - - callback(err, conn) - }) -} - -/** - * Uses the given MultistreamDialer to handle the given connection - * - * A helper method to catch errors from pull streams ending unexpectedly - * Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged - * - * @param {MultistreamDialer} msDialer - * @param {Connection} connection The connection to handle - * @param {function(Error)} callback Used for standard async flow - * @param {function(Error)} abort A callback to be used for ending the connection outright - * @returns {void} - */ -function handleSafe (msDialer, connection, callback, abort) { - msDialer.handle(connection, (err) => { - // Repackage errors from pull-streams ending unexpectedly. - // Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged. - if (err === true) { - return abort(new Error(UNEXPECTED_END)) - } - - callback(err) - }) -} - -/** - * Manages dialing to another peer, including muxer upgrades - * and crypto management. The main entry point for dialing is - * Dialer.dial - * - * @param {Switch} _switch - * @param {PeerInfo} peerInfo - * @param {string} protocol - * @param {function(Error, Connection)} callback - */ -class Dialer { - constructor (_switch, peerInfo, ourPeerInfo, protocol, callback) { - this.switch = _switch - this.peerInfo = peerInfo - this.ourPeerInfo = ourPeerInfo - this.protocol = protocol - this.callback = callback - } - - /** - * Initializes a proxy connection and returns it. The connection is also immediately - * dialed. This will include establishing the base connection, crypto, muxing and the - * protocol handshake if all needed components have already been set. - * - * @returns {Connection} - */ - dial () { - const proxyConnection = new Connection() - proxyConnection.setPeerInfo(this.peerInfo) - - waterfall([ - (cb) => { - this._establishConnection(cb) - }, - (connection, cb) => { - if (connection) { - proxyConnection.setPeerInfo(this.peerInfo) - proxyConnection.setInnerConn(connection) - return cb(null, proxyConnection) - } - cb(null) - } - ], (err, connection) => { - if ((err && err.message === UNEXPECTED_END) || err === true) { - log('Connection dropped for %s', this.peerInfo.id.toB58String()) - return this.callback(null, null) - } - - this.callback(err, connection) - }) - - return proxyConnection - } - - /** - * Establishes a base connection and then continues to upgrade that connection - * including: crypto, muxing and the protocol handshake. If any upgrade is not - * yet available, or already exists, the upgrade will continue where it left off. - * - * @private - * @param {function(Error, Connection)} callback - * @returns {void} - */ - _establishConnection (callback) { - const b58Id = this.peerInfo.id.toB58String() - log('dialing %s', b58Id) - if (b58Id === this.ourPeerInfo.id.toB58String()) { - return callback(new Error('A node cannot dial itself')) - } - - waterfall([ - (cb) => { - // Start with a base connection, which includes encryption - this._createBaseConnection(b58Id, cb) - }, - (baseConnection, cb) => { - // Upgrade the connection with a muxer - this._createMuxedConnection(baseConnection, b58Id, cb) - }, - (muxer, cb) => { - // If we have no protocol, dont continue with the handshake - if (!this.protocol) { - return cb() - } - - // If we have a muxer, create a new stream, otherwise it's a standard connection - if (muxer.newStream) { - muxer.newStream((err, conn) => { - if (err) return cb(err) - - this._performProtocolHandshake(conn, cb) - }) - return - } - - this._performProtocolHandshake(muxer, cb) - } - ], (err, connection) => { - callback(err, connection) - }) - } - - /** - * If the base connection already exists to the PeerId key, `b58Id`, - * it will be returned in the callback. If no connection exists, one will - * be attempted via Dialer.attemptDial. - * - * @private - * @param {string} b58Id - * @param {function(Error, Connection)} callback - * @returns {void} - */ - _createBaseConnection (b58Id, callback) { - const baseConnection = this.switch.conns[b58Id] - const muxedConnection = this.switch.muxedConns[b58Id] - - // if the muxed connection exists, dont return a connection, - // _createMuxedConnection will get the connection - if (muxedConnection) { - return callback(null, null) - } - if (baseConnection) { - this.switch.conns[b58Id] = undefined - return callback(null, baseConnection) - } - - waterfall([ - (cb) => { - this._attemptDial(cb) - }, - (baseConnection, cb) => { - // Create a private connection if it's needed - this._createPrivateConnection(baseConnection, cb) - }, - (connection, cb) => { - // Add the Switch's crypt encryption to the connection - this._encryptConnection(connection, cb) - } - ], (err, encryptedConnection) => { - if (err) { - return callback(err) - } - - callback(null, encryptedConnection) - }) - } - - /** - * If the switch has a private network protector, `switch.protector`, its `protect` - * method will be called with the given connection. The resulting, wrapped connection - * will be returned via the callback. - * - * @param {Connection} connection The connection to protect - * @param {function(Error, Connection)} callback - * @returns {void} - */ - _createPrivateConnection (connection, callback) { - if (this.switch.protector === null) { - return callback(null, connection) - } - - // If the switch has a protector, be private - const protectedConnection = this.switch.protector.protect(connection, (err) => { - if (err) { - return callback(err) - } - - protectedConnection.setPeerInfo(this.peerInfo) - callback(null, protectedConnection) - }) - } - - /** - * If the given PeerId key, `b58Id`, has an existing muxed connection - * it will be returned via the callback, otherwise the connection - * upgrade will be initiated via Dialer.attemptMuxerUpgrade. - * - * @private - * @param {Connection} connection - * @param {string} b58Id - * @param {function(Error, Connection)} callback - * @returns {void} - */ - _createMuxedConnection (connection, b58Id, callback) { - const muxedConnection = this.switch.muxedConns[b58Id] - if (muxedConnection) { - return callback(null, muxedConnection.muxer) - } - - connection.setPeerInfo(this.peerInfo) - this._attemptMuxerUpgrade(connection, b58Id, (err, muxer) => { - if (err && !this.protocol) { - this.switch.conns[b58Id] = connection - return callback(null, null) - } - - if (err) { - log('muxer upgrade failed with error', err) - // couldn't upgrade to Muxer, it is ok, use the existing connection - return callback(null, connection) - } - - callback(null, muxer) - }, callback) - } - - /** - * Iterates over each Muxer on the Switch and attempts to upgrade - * the given `connection`. Successful muxed connections will be stored - * on the Switch.muxedConns with `b58Id` as their key for future reference. - * - * @private - * @param {Connection} connection - * @param {string} b58Id - * @param {function(Error, Connection)} callback - * @param {function(Error, Connection)} abort A callback to be used for ending the connection outright - * @returns {void} - */ - _attemptMuxerUpgrade (connection, b58Id, callback, abort) { - const muxers = Object.keys(this.switch.muxers) - - if (muxers.length === 0) { - return callback(new Error('no muxers available')) - } - - const msDialer = new multistream.Dialer() - handleSafe(msDialer, connection, (err) => { - if (err) { - return callback(new Error('multistream not supported')) - } - - // 1. try to handshake in one of the muxers available - // 2. if succeeds - // - add the muxedConn to the list of muxedConns - // - add incomming new streams to connHandler - const nextMuxer = (key) => { - log('selecting %s', key) - selectSafe(msDialer, key, (err, _conn) => { - if (err) { - if (muxers.length === 0) { - return callback(new Error('could not upgrade to stream muxing')) - } - - return nextMuxer(muxers.shift()) - } - - // observe muxed connections - const conn = observeConnection(null, key, _conn, this.switch.observer) - - const muxedConn = this.switch.muxers[key].dialer(conn) - this.switch.muxedConns[b58Id] = { - muxer: muxedConn - } - - muxedConn.once('close', () => { - delete this.switch.muxedConns[b58Id] - this.peerInfo.disconnect() - this.switch._peerInfo.disconnect() - log(`closed connection to ${b58Id}`) - setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo)) - }) - - // For incoming streams, in case identify is on - muxedConn.on('stream', (conn) => { - conn.setPeerInfo(this.peerInfo) - this.switch.protocolMuxer(null)(conn) - }) - - setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo)) - - callback(null, muxedConn) - }, abort) - } - - nextMuxer(muxers.shift()) - }, abort) - } - - /** - * Iterates over each Transport on the Switch and attempts to connect - * to the peer. Once a Transport succeeds, no additional Transports will - * be dialed. - * - * @private - * @param {function(Error, Connection)} callback - * @returns {void} - */ - _attemptDial (callback) { - if (!this.switch.hasTransports()) { - return callback(new Error('No transports registered, dial not possible')) - } - - const tKeys = this.switch.availableTransports(this.peerInfo) - - const circuitEnabled = Boolean(this.switch.transports[Circuit.tag]) - let circuitTried = false - - const nextTransport = (key) => { - let transport = key - const b58Id = this.peerInfo.id.toB58String() - if (!transport) { - if (!circuitEnabled) { - const msg = `Circuit not enabled and all transports failed to dial peer ${b58Id}!` - return callback(new Error(msg)) - } - - if (circuitTried) { - return callback(new Error(`No available transports to dial peer ${b58Id}!`)) - } - - log(`Falling back to dialing over circuit`) - this.peerInfo.multiaddrs.add(`/p2p-circuit/ipfs/${b58Id}`) - circuitTried = true - transport = Circuit.tag - } - - log(`dialing transport ${transport}`) - this.switch.transport.dial(transport, this.peerInfo, (err, _conn) => { - if (err) { - log(err) - return nextTransport(tKeys.shift()) - } - - const conn = observeConnection(transport, null, _conn, this.switch.observer) - callback(null, conn) - }) - } - - nextTransport(tKeys.shift()) - } - - /** - * Attempts to encrypt the given `connection` with the Switch's crypto. - * - * @private - * @param {Connection} connection - * @param {function(Error, Connection)} callback - * @returns {void} - */ - _encryptConnection (connection, callback) { - const msDialer = new multistream.Dialer() - handleSafe(msDialer, connection, (err) => { - if (err) { - return callback(err) - } - - const myId = this.switch._peerInfo.id - log('selecting crypto: %s', this.switch.crypto.tag) - - selectSafe(msDialer, this.switch.crypto.tag, (err, _conn) => { - if (err) { - return callback(err) - } - - const conn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer) - - const encryptedConnection = this.switch.crypto.encrypt(myId, conn, this.peerInfo.id, (err) => { - if (err) { - return callback(err) - } - - encryptedConnection.setPeerInfo(this.peerInfo) - callback(null, encryptedConnection) - }) - }, callback) - }, callback) - } - - /** - * Initiates a handshake for the Dialer's set protocol - * - * @private - * @param {Connection} connection - * @param {function(Error, Connection)} callback - * @returns {void} - */ - _performProtocolHandshake (connection, callback) { - // If there is no protocol set yet, don't perform the handshake - if (!this.protocol) { - callback() - } - - const msDialer = new multistream.Dialer() - handleSafe(msDialer, connection, (err) => { - if (err) { - return callback(err) - } - - selectSafe(msDialer, this.protocol, (err, _conn) => { - if (err) { - log(`could not perform protocol handshake: `, err) - return callback(err) - } - const conn = observeConnection(null, this.protocol, _conn, this.switch.observer) - callback(null, conn) - }, callback) - }, callback) - } -} - -/** - * Returns a Dialer generator that when called, will immediately begin dialing - * fo the given `peer`. - * - * @param {Switch} _switch - * @returns {function(PeerInfo, string, function(Error, Connection))} - */ -function dial (_switch) { - /** - * Creates a new dialer and immediately begins dialing to the given `peer` - * - * @param {PeerInfo} peer - * @param {string} protocol - * @param {function(Error, Connection)} callback - * @returns {Connection} - */ - return (peer, protocol, callback) => { - if (typeof protocol === 'function') { - callback = protocol - protocol = null - } - - callback = callback || function noop () {} - - const peerInfo = getPeerInfo(peer, _switch._peerBook) - const dialer = new Dialer(_switch, peerInfo, _switch._peerInfo, protocol, callback) - - return dialer.dial() - } -} - -module.exports = dial diff --git a/src/dialer.js b/src/dialer.js new file mode 100644 index 0000000000..1a60ccedd2 --- /dev/null +++ b/src/dialer.js @@ -0,0 +1,108 @@ +'use strict' + +const Connection = require('interface-connection').Connection +const ConnectionFSM = require('./connection') +const getPeerInfo = require('./get-peer-info') +const once = require('once') + +const debug = require('debug') +const log = debug('libp2p:switch:dial') + +function maybePerformHandshake ({ protocol, proxyConnection, connection, callback }) { + if (protocol) { + return connection.shake(protocol, (err, conn) => { + if (!conn) { + return callback(err) + } + + proxyConnection.setPeerInfo(connection.theirPeerInfo) + proxyConnection.setInnerConn(conn) + callback(null, proxyConnection) + }) + } + + callback() +} + +/** + * Returns a Dialer generator that when called, will immediately begin dialing + * to the given `peer`. + * + * @param {Switch} _switch + * @param {Boolean} returnFSM Whether or not to return an fsm instead of a Connection + * @returns {function(PeerInfo, string, function(Error, Connection))} + */ +function dial (_switch, returnFSM) { + /** + * Creates a new dialer and immediately begins dialing to the given `peer` + * + * @param {PeerInfo} peer + * @param {string} protocol + * @param {function(Error, Connection)} callback + * @returns {Connection} + */ + return (peer, protocol, callback) => { + if (typeof protocol === 'function') { + callback = protocol + protocol = null + } + + callback = once(callback || function noop () {}) + + const peerInfo = getPeerInfo(peer, _switch._peerBook) + const b58Id = peerInfo.id.toB58String() + + log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`) + + let connection = _switch.muxedConns[b58Id] || _switch.conns[b58Id] + + if (!ConnectionFSM.isConnectionFSM(connection)) { + connection = new ConnectionFSM({ + _switch, + peerInfo, + muxer: _switch.muxedConns[b58Id] || null + }) + connection.once('error', (err) => callback(err)) + connection.once('connected', () => connection.protect()) + connection.once('private', () => connection.encrypt()) + connection.once('encrypted', () => connection.upgrade()) + connection.once('muxed', () => { + maybePerformHandshake({ + protocol, + proxyConnection, + connection, + callback + }) + }) + connection.once('unmuxed', () => { + maybePerformHandshake({ + protocol, + proxyConnection, + connection, + callback + }) + }) + } + + const proxyConnection = new Connection() + proxyConnection.setPeerInfo(peerInfo) + + setImmediate(() => { + // If we have a muxed connection, attempt the protocol handshake + if (connection.getState() === 'MUXED') { + maybePerformHandshake({ + protocol, + proxyConnection, + connection, + callback + }) + } else { + connection.dial() + } + }) + + return returnFSM ? connection : proxyConnection + } +} + +module.exports = dial diff --git a/src/errors.js b/src/errors.js index bbc1193c03..22833bd238 100644 --- a/src/errors.js +++ b/src/errors.js @@ -1,3 +1,17 @@ 'use strict' +const errCode = require('err-code') + module.exports.PROTECTOR_REQUIRED = 'No protector provided with private network enforced' +module.exports.CONNECTION_FAILED = (err) => errCode(err, 'CONNECTION_FAILED') +module.exports.DIAL_SELF = () => errCode(new Error('A node cannot dial itself'), 'DIAL_SELF') +module.exports.NO_TRANSPORTS_REGISTERED = () => errCode(new Error('No transports registered, dial not possible'), 'NO_TRANSPORTS_REGISTERED') +module.exports.UNEXPECTED_END = () => errCode(new Error('Unexpected end of input from reader.'), 'UNEXPECTED_END') +module.exports.INVALID_STATE_TRANSITION = (err) => errCode(err, 'INVALID_STATE_TRANSITION') + +module.exports.maybeUnexpectedEnd = (err) => { + if (err === true) { + return module.exports.UNEXPECTED_END() + } + return err +} diff --git a/src/index.js b/src/index.js index a3db99d3d0..d0744bf2f4 100644 --- a/src/index.js +++ b/src/index.js @@ -1,20 +1,30 @@ 'use strict' -const EE = require('events').EventEmitter +const FSM = require('fsm-event') +const EventEmitter = require('events').EventEmitter const each = require('async/each') const series = require('async/series') const TransportManager = require('./transport') -const ConnectionManager = require('./connection') +const ConnectionManager = require('./connection/manager') const getPeerInfo = require('./get-peer-info') -const dial = require('./dial') +const dial = require('./dialer') +const connectionHandler = require('./connection/handler') const ProtocolMuxer = require('./protocol-muxer') const plaintext = require('./plaintext') const Observer = require('./observer') const Stats = require('./stats') const assert = require('assert') const Errors = require('./errors') +const debug = require('debug') +const log = debug('libp2p:switch') +log.error = debug('libp2p:switch:error') -class Switch extends EE { +/** + * @fires Switch#stop Triggered when the switch has stopped + * @fires Switch#start Triggered when the switch has started + * @fires Switch#error Triggered whenever an error occurs + */ +class Switch extends EventEmitter { constructor (peerInfo, peerBook, options) { super() assert(peerInfo, 'You must provide a `peerInfo`') @@ -62,14 +72,49 @@ class Switch extends EE { this.stats = Stats(this.observer, this._options.stats) this.protocolMuxer = ProtocolMuxer(this.protocols, this.observer) - this.handle(this.crypto.tag, (protocol, conn) => { - const peerId = this._peerInfo.id - const wrapped = this.crypto.encrypt(peerId, conn, undefined, () => {}) - return this.protocolMuxer(null)(wrapped) - }) - // higher level (public) API this.dial = dial(this) + this.dialFSM = dial(this, true) + + // All purpose connection handler for managing incoming connections + this._connectionHandler = connectionHandler(this) + + // Setup the internal state + this.state = new FSM('STOPPED', { + STOPPED: { + start: 'STARTING', + stop: 'STOPPING' // ensures that any transports that were manually started are stopped + }, + STARTING: { + done: 'STARTED', + stop: 'STOPPING' + }, + STARTED: { + stop: 'STOPPING', + start: 'STARTED' + }, + STOPPING: { done: 'STOPPED' } + }) + this.state.on('STARTING', () => { + log('The switch is starting') + this._onStarting() + }) + this.state.on('STOPPING', () => { + log('The switch is stopping') + this._onStopping() + }) + this.state.on('STARTED', () => { + log('The switch has started') + this.emit('start') + }) + this.state.on('STOPPED', () => { + log('The switch has stopped') + this.emit('stop') + }) + this.state.on('error', (err) => { + log.error(err) + this.emit('error', err) + }) } /** @@ -90,52 +135,6 @@ class Switch extends EE { }) } - /** - * Starts the Switch listening on all available Transports - * - * @param {function(Error)} callback - * @returns {void} - */ - start (callback) { - each(this.availableTransports(this._peerInfo), (ts, cb) => { - // Listen on the given transport - this.transport.listen(ts, {}, null, cb) - }, callback) - } - - /** - * Stops all services and connections for the Switch - * - * @param {function(Error)} callback - * @returns {void} - */ - stop (callback) { - this.stats.stop() - series([ - (cb) => each(this.muxedConns, (conn, cb) => { - // If the connection was destroyed while we are hanging up, continue - if (!conn) { - return cb() - } - - conn.muxer.end((err) => { - // If OK things are fine, and someone just shut down - if (err && err.message !== 'Fatal error: OK') { - return cb(err) - } - cb() - }) - }, cb), - (cb) => { - each(this.transports, (transport, cb) => { - each(transport.listeners, (listener, cb) => { - listener.close(cb) - }, cb) - }, cb) - } - ], callback) - } - /** * Adds the `handlerFunc` and `matchFunc` to the Switch's protocol * handler list for the given `protocol`. If the `matchFunc` returns @@ -177,12 +176,12 @@ class Switch extends EE { const peerInfo = getPeerInfo(peer, this.peerBook) const key = peerInfo.id.toB58String() if (this.muxedConns[key]) { - const muxer = this.muxedConns[key].muxer - muxer.once('close', () => { + const conn = this.muxedConns[key] + conn.once('close', () => { delete this.muxedConns[key] callback() }) - muxer.end() + conn.close() } else { callback() } @@ -197,6 +196,81 @@ class Switch extends EE { const transports = Object.keys(this.transports).filter((t) => t !== 'Circuit') return transports && transports.length > 0 } + + /** + * Issues a start on the Switch state. + * + * @param {function} callback deprecated: Listening for the `error` and `start` events are recommended + * @returns {void} + */ + start (callback = () => {}) { + // Add once listener for deprecated callback support + this.once('start', callback) + + this.state('start') + } + + /** + * Issues a stop on the Switch state. + * + * @param {function} callback deprecated: Listening for the `error` and `stop` events are recommended + * @returns {void} + */ + stop (callback = () => {}) { + // Add once listener for deprecated callback support + this.once('stop', callback) + + this.state('stop') + } + + /** + * A listener that will start any necessary services and listeners + * + * @private + * @returns {void} + */ + _onStarting () { + each(this.availableTransports(this._peerInfo), (ts, cb) => { + // Listen on the given transport + this.transport.listen(ts, {}, null, cb) + }, (err) => { + if (err) { + log.error(err) + return this.emit('error', err) + } + this.state('done') + }) + } + + /** + * A listener that will turn off all running services and listeners + * + * @private + * @returns {void} + */ + _onStopping () { + this.stats.stop() + series([ + (cb) => each(this.muxedConns, (conn, cb) => { + // If the connection was destroyed while we are hanging up, continue + if (!conn) { + return cb() + } + + conn.once('close', cb) + conn.close() + }, cb), + (cb) => { + each(this.transports, (transport, cb) => { + each(transport.listeners, (listener, cb) => { + listener.close(cb) + }, cb) + }, cb) + } + ], (_) => { + this.state('done') + }) + } } module.exports = Switch diff --git a/src/limit-dialer/index.js b/src/limit-dialer/index.js index 33f423fa9e..7d0208c1d7 100644 --- a/src/limit-dialer/index.js +++ b/src/limit-dialer/index.js @@ -4,7 +4,7 @@ const map = require('async/map') const debug = require('debug') const once = require('once') -const log = debug('libp2p:swarm:dialer') +const log = debug('libp2p:switch:dialer') const DialQueue = require('./queue') diff --git a/src/limit-dialer/queue.js b/src/limit-dialer/queue.js index df23d1ff75..c204b620b2 100644 --- a/src/limit-dialer/queue.js +++ b/src/limit-dialer/queue.js @@ -6,7 +6,8 @@ const timeout = require('async/timeout') const queue = require('async/queue') const debug = require('debug') -const log = debug('libp2p:swarm:dialer:queue') +const log = debug('libp2p:switch:dialer:queue') +log.error = debug('libp2p:switch:dialer:queue:error') /** * Queue up the amount of dials to a given peer. @@ -37,26 +38,28 @@ class DialQueue { * @private */ _doWork (transport, addr, token, callback) { - log('work') + log(`${transport.constructor.name}:work:start`) this._dialWithTimeout(transport, addr, (err, conn) => { if (err) { - log('work:error') + log.error(`${transport.constructor.name}:work`, err) return callback(null, {error: err}) } if (token.cancel) { - log('work:cancel') + log(`${transport.constructor.name}:work:cancel`) // clean up already done dials pull(pull.empty(), conn) - // TODO: proper cleanup once the connection interface supports it - // return conn.close(() => callback(new Error('Manual cancel')) + // If we can close the connection, do it + if (typeof conn.close === 'function') { + return conn.close((_) => callback(null, {cancel: true})) + } return callback(null, {cancel: true}) } // one is enough token.cancel = true - log('work:success') + log(`${transport.constructor.name}:work:success`) const proxyConn = new Connection() proxyConn.setInnerConn(conn) diff --git a/src/protocol-muxer.js b/src/protocol-muxer.js index dce7f9f9eb..39c4a63d00 100644 --- a/src/protocol-muxer.js +++ b/src/protocol-muxer.js @@ -5,14 +5,19 @@ const observeConn = require('./observe-connection') const debug = require('debug') const log = debug('libp2p:switch:protocol-muxer') +log.error = debug('libp2p:switch:protocol-muxer:error') module.exports = function protocolMuxer (protocols, observer) { - return (transport) => (_parentConn) => { - const parentConn = transport - ? observeConn(transport, null, _parentConn, observer) - : _parentConn + return (transport) => (_parentConn, msListener) => { + const ms = msListener || new multistream.Listener() + let parentConn - const ms = new multistream.Listener() + // Only observe the transport if we have one, and there is not already a listener + if (transport && !msListener) { + parentConn = observeConn(transport, null, _parentConn, observer) + } else { + parentConn = _parentConn + } Object.keys(protocols).forEach((protocol) => { if (!protocol) { @@ -36,7 +41,7 @@ module.exports = function protocolMuxer (protocols, observer) { ms.handle(parentConn, (err) => { if (err) { - // the multistream handshake failed + log.error(`multistream handshake failed`, err) } }) } diff --git a/src/transport.js b/src/transport.js index 86ebb87a81..db11567764 100644 --- a/src/transport.js +++ b/src/transport.js @@ -43,6 +43,43 @@ class TransportManager { } } + /** + * Closes connections for the given transport key + * and removes it from the switch. + * + * @param {String} key + * @param {function(Error)} callback + * @returns {void} + */ + remove (key, callback) { + callback = callback || function () {} + + if (!this.switch.transports[key]) { + return callback() + } + + this.close(key, (err) => { + delete this.switch.transports[key] + callback(err) + }) + } + + /** + * Calls `remove` on each transport the switch has + * + * @param {function(Error)} callback + * @returns {void} + */ + removeAll (callback) { + const tasks = Object.keys(this.switch.transports).map((key) => { + return (cb) => { + this.remove(key, cb) + } + }) + + parallel(tasks, callback) + } + /** * For a given transport `key`, dial to all that transport multiaddrs * @@ -80,29 +117,13 @@ class TransportManager { * If a `handler` is not provided, the Switch's `protocolMuxer` will be used. * * @param {String} key - * @param {*} options + * @param {*} _options Currently ignored * @param {function(Connection)} handler * @param {function(Error)} callback * @returns {void} */ - listen (key, options, handler, callback) { - let muxerHandler - - // if no handler is passed, we pass conns to protocolMuxer - if (!handler) { - handler = this.switch.protocolMuxer(key) - } - - // If we have a protector make the connection private - if (this.switch.protector) { - muxerHandler = handler - handler = (parentConnection) => { - const connection = this.switch.protector.protect(parentConnection, () => { - // If we get an error here, we should stop talking to this peer - muxerHandler(connection) - }) - } - } + listen (key, _options, handler, callback) { + handler = this.switch._connectionHandler(key, handler) const transport = this.switch.transports[key] const multiaddrs = TransportManager.dialables( diff --git a/test/circuit-relay.node.js b/test/circuit-relay.node.js index 04bc2e7814..3a65c232ec 100644 --- a/test/circuit-relay.node.js +++ b/test/circuit-relay.node.js @@ -260,7 +260,7 @@ describe(`circuit`, function () { }) })) - after((done) => { + after(function (done) { parallel([ (cb) => bootstrapSwitch.stop(cb), (cb) => tcpSwitch1.stop(cb), @@ -271,9 +271,11 @@ describe(`circuit`, function () { }) it('should be able to dial tcp -> tcp', (done) => { - tcpSwitch2.once('peer-mux-established', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.equal(tcpPeer1.id.toB58String()) - done() + tcpSwitch2.on('peer-mux-established', (peerInfo) => { + if (peerInfo.id.toB58String() === tcpPeer1.id.toB58String()) { + tcpSwitch2.removeAllListeners('peer-mux-established') + done() + } }) tcpSwitch1.dial(tcpPeer2, (err, connection) => { expect(err).to.not.exist() @@ -283,9 +285,11 @@ describe(`circuit`, function () { }) it('should be able to dial tcp -> ws over relay', (done) => { - wsSwitch1.once('peer-mux-established', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.equal(tcpPeer1.id.toB58String()) - done() + wsSwitch1.on('peer-mux-established', (peerInfo) => { + if (peerInfo.id.toB58String() === tcpPeer1.id.toB58String()) { + wsSwitch1.removeAllListeners('peer-mux-established') + done() + } }) tcpSwitch1.dial(wsPeer1, (err, connection) => { expect(err).to.not.exist() @@ -295,9 +299,11 @@ describe(`circuit`, function () { }) it('should be able to dial ws -> ws', (done) => { - wsSwitch2.once('peer-mux-established', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.equal(wsPeer1.id.toB58String()) - done() + wsSwitch2.on('peer-mux-established', (peerInfo) => { + if (peerInfo.id.toB58String() === wsPeer1.id.toB58String()) { + wsSwitch2.removeAllListeners('peer-mux-established') + done() + } }) wsSwitch1.dial(wsPeer2, (err, connection) => { expect(err).to.not.exist() @@ -307,10 +313,12 @@ describe(`circuit`, function () { }) it('should be able to dial ws -> tcp over relay', (done) => { - tcpSwitch1.once('peer-mux-established', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.equal(wsPeer2.id.toB58String()) - expect(Object.keys(tcpSwitch1._peerBook.getAll())).to.include(wsPeer2.id.toB58String()) - done() + tcpSwitch1.on('peer-mux-established', (peerInfo) => { + if (peerInfo.id.toB58String() === wsPeer2.id.toB58String()) { + tcpSwitch1.removeAllListeners('peer-mux-established') + expect(Object.keys(tcpSwitch1._peerBook.getAll())).to.include(wsPeer2.id.toB58String()) + done() + } }) wsSwitch2.dial(tcpPeer1, (err, connection) => { expect(err).to.not.exist() diff --git a/test/connection.node.js b/test/connection.node.js new file mode 100644 index 0000000000..1e1d1009bd --- /dev/null +++ b/test/connection.node.js @@ -0,0 +1,392 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const expect = chai.expect +chai.use(require('dirty-chai')) +chai.use(require('chai-checkmark')) +const sinon = require('sinon') +const PeerBook = require('peer-book') +const WS = require('libp2p-websockets') +const parallel = require('async/parallel') +const secio = require('libp2p-secio') +const pull = require('pull-stream') +const multiplex = require('libp2p-mplex') +const spdy = require('libp2p-spdy') +const Connection = require('interface-connection').Connection +const Protector = require('libp2p-pnet') +const generatePSK = Protector.generate + +const psk = Buffer.alloc(95) +generatePSK(psk) + +const ConnectionFSM = require('../src/connection') +const Switch = require('../src') +const createInfos = require('./utils').createInfos + +describe('ConnectionFSM', () => { + let spdySwitch + let listenerSwitch + let dialerSwitch + + before((done) => { + createInfos(3, (err, infos) => { + if (err) { + return done(err) + } + + dialerSwitch = new Switch(infos.shift(), new PeerBook()) + dialerSwitch._peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/15451/ws') + dialerSwitch.connection.crypto(secio.tag, secio.encrypt) + dialerSwitch.connection.addStreamMuxer(multiplex) + dialerSwitch.transport.add('ws', new WS()) + + listenerSwitch = new Switch(infos.shift(), new PeerBook()) + listenerSwitch._peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/15452/ws') + listenerSwitch.connection.crypto(secio.tag, secio.encrypt) + listenerSwitch.connection.addStreamMuxer(multiplex) + listenerSwitch.transport.add('ws', new WS()) + + spdySwitch = new Switch(infos.shift(), new PeerBook()) + spdySwitch._peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/15453/ws') + spdySwitch.connection.crypto(secio.tag, secio.encrypt) + spdySwitch.connection.addStreamMuxer(spdy) + spdySwitch.transport.add('ws', new WS()) + + parallel([ + (cb) => dialerSwitch.start(cb), + (cb) => listenerSwitch.start(cb), + (cb) => spdySwitch.start(cb) + ], (err) => { + done(err) + }) + }) + }) + + after((done) => { + parallel([ + (cb) => dialerSwitch.stop(cb), + (cb) => listenerSwitch.stop(cb), + (cb) => spdySwitch.stop(cb) + ], () => { + done() + }) + }) + + it('should have a default state of disconnected', () => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + expect(connection.getState()).to.equal('DISCONNECTED') + }) + + it('should emit an error with an invalid transition', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + expect(connection.getState()).to.equal('DISCONNECTED') + + connection.once('error', (err) => { + expect(err).to.have.property('code', 'INVALID_STATE_TRANSITION') + done() + }) + connection.upgrade() + }) + + it('.dial should create a basic connection', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + connection.once('connected', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + done() + }) + + connection.dial() + }) + + it('should emit warning on dial failed attempt', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + const stub = sinon.stub(dialerSwitch.transport, 'dial').callsArgWith(2, { + errors: [ + new Error('address in use') + ] + }) + + connection.once('error:connection_attempt_failed', (errors) => { + expect(errors).to.have.length(1).mark() + stub.restore() + }) + + connection.once('error', (err) => { + expect(err).to.exist().mark() + }) + + expect(2).checks(done) + + connection.dial() + }) + + it('should be able to encrypt a basic connection', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + connection.once('connected', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.encrypt() + }) + connection.once('encrypted', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + done() + }) + + connection.dial() + }) + + it('should disconnect on encryption failure', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + const stub = sinon.stub(dialerSwitch.crypto, 'encrypt') + .callsArgWith(3, new Error('fail encrypt')) + + connection.once('connected', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.encrypt() + }) + connection.once('close', () => { + stub.restore() + done() + }) + connection.once('encrypted', () => { + throw new Error('should not encrypt') + }) + + connection.dial() + }) + + it('should be able to upgrade an encrypted connection', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + connection.once('connected', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.encrypt() + }) + connection.once('encrypted', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.upgrade() + }) + connection.once('muxed', (conn) => { + expect(conn.multicodec).to.equal(multiplex.multicodec) + done() + }) + + connection.dial() + }) + + it('should fail to upgrade a connection with incompatible muxers', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: spdySwitch._peerInfo + }) + + connection.once('connected', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.encrypt() + }) + connection.once('encrypted', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.upgrade() + }) + connection.once('error:upgrade_failed', (err) => { + expect(err).to.exist() + done() + }) + + connection.dial() + }) + + it('should be able to handshake a protocol over a muxed connection', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + listenerSwitch.handle('/muxed-conn-test/1.0.0', (_, conn) => { + return pull(conn, conn) + }) + + connection.once('connected', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.encrypt() + }) + connection.once('encrypted', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.upgrade() + }) + connection.once('muxed', (conn) => { + expect(conn.multicodec).to.equal(multiplex.multicodec) + + connection.shake('/muxed-conn-test/1.0.0', (err, protocolConn) => { + expect(err).to.not.exist() + expect(protocolConn).to.be.an.instanceof(Connection) + done() + }) + }) + + connection.dial() + }) + + it('should not return a connection when handshaking with no protocol', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + listenerSwitch.handle('/muxed-conn-test/1.0.0', (_, conn) => { + return pull(conn, conn) + }) + + connection.once('connected', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.encrypt() + }) + connection.once('encrypted', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.upgrade() + }) + connection.once('muxed', (conn) => { + expect(conn.multicodec).to.equal(multiplex.multicodec) + + connection.shake(null, (err, protocolConn) => { + expect(err).to.not.exist() + expect(protocolConn).to.not.exist() + done() + }) + }) + + connection.dial() + }) + + describe('with no muxers', () => { + let oldMuxers + before(() => { + oldMuxers = dialerSwitch.muxers + dialerSwitch.muxers = {} + }) + + after(() => { + dialerSwitch.muxers = oldMuxers + }) + + it('should be able to handshake a protocol over a basic connection', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + listenerSwitch.handle('/unmuxed-conn-test/1.0.0', (_, conn) => { + return pull(conn, conn) + }) + + connection.once('connected', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.encrypt() + }) + connection.once('encrypted', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.upgrade() + }) + connection.once('muxed', () => { + throw new Error('connection shouldnt be muxed') + }) + connection.once('unmuxed', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + + connection.shake('/unmuxed-conn-test/1.0.0', (err, protocolConn) => { + expect(err).to.not.exist() + expect(protocolConn).to.be.an.instanceof(Connection) + done() + }) + }) + + connection.dial() + }) + }) + + describe('with a protector', () => { + // Restart the switches with protectors + before((done) => { + parallel([ + (cb) => dialerSwitch.stop(cb), + (cb) => listenerSwitch.stop(cb) + ], () => { + dialerSwitch.protector = new Protector(psk) + listenerSwitch.protector = new Protector(psk) + + parallel([ + (cb) => dialerSwitch.start(cb), + (cb) => listenerSwitch.start(cb) + ], done) + }) + }) + + it('should be able to protect a basic connection', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + connection.once('private', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + done() + }) + + connection.once('connected', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.protect() + }) + + connection.dial() + }) + + it('should be able to encrypt a protected connection', (done) => { + const connection = new ConnectionFSM({ + _switch: dialerSwitch, + peerInfo: listenerSwitch._peerInfo + }) + + connection.once('connected', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.protect() + }) + connection.once('private', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + connection.encrypt() + }) + connection.once('encrypted', (conn) => { + expect(conn).to.be.an.instanceof(Connection) + done() + }) + + connection.dial() + }) + }) +}) diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js new file mode 100644 index 0000000000..8c188b5e4d --- /dev/null +++ b/test/dial-fsm.node.js @@ -0,0 +1,136 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(require('chai-checkmark')) +chai.use(dirtyChai) +const PeerBook = require('peer-book') +const parallel = require('async/parallel') +const WS = require('libp2p-websockets') +const TCP = require('libp2p-tcp') +const secio = require('libp2p-secio') +const multiplex = require('libp2p-mplex') + +const utils = require('./utils') +const createInfos = utils.createInfos +const Switch = require('../src') + +describe('dialFSM', () => { + let switchA + let switchB + let switchC + + before((done) => createInfos(3, (err, infos) => { + expect(err).to.not.exist() + + const peerA = infos[0] + const peerB = infos[1] + const peerC = infos[2] + + peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/0') + peerB.multiaddrs.add('/ip4/0.0.0.0/tcp/0') + peerC.multiaddrs.add('/ip4/0.0.0.0/tcp/0/ws') + // Give peer C a tcp address we wont actually support + peerC.multiaddrs.add('/ip4/0.0.0.0/tcp/0') + + switchA = new Switch(peerA, new PeerBook()) + switchB = new Switch(peerB, new PeerBook()) + switchC = new Switch(peerC, new PeerBook()) + + switchA.transport.add('tcp', new TCP()) + switchB.transport.add('tcp', new TCP()) + switchC.transport.add('ws', new WS()) + + switchA.connection.crypto(secio.tag, secio.encrypt) + switchB.connection.crypto(secio.tag, secio.encrypt) + switchC.connection.crypto(secio.tag, secio.encrypt) + + switchA.connection.addStreamMuxer(multiplex) + switchB.connection.addStreamMuxer(multiplex) + switchC.connection.addStreamMuxer(multiplex) + + switchA.connection.reuse() + switchB.connection.reuse() + switchC.connection.reuse() + + parallel([ + (cb) => switchA.transport.listen('tcp', {}, null, cb), + (cb) => switchB.transport.listen('tcp', {}, null, cb), + (cb) => switchC.transport.listen('ws', {}, null, cb) + ], done) + })) + + after((done) => { + parallel([ + (cb) => switchA.stop(cb), + (cb) => switchB.stop(cb), + (cb) => switchC.stop(cb) + ], done) + }) + + it('should emit `error:connection_attempt_failed` when a transport fails to dial', (done) => { + switchC.handle('/warn/1.0.0', () => { }) + + const connFSM = switchA.dialFSM(switchC._peerInfo, '/warn/1.0.0', () => { }) + + connFSM.once('error:connection_attempt_failed', (errors) => { + expect(errors).to.be.an('array') + expect(errors).to.have.length(1) + done() + }) + }) + + it('should emit an `error` event when a it cannot dial a peer', (done) => { + switchC.handle('/error/1.0.0', () => { }) + + const connFSM = switchA.dialFSM(switchC._peerInfo, '/error/1.0.0', () => { }) + + connFSM.once('error', (err) => { + expect(err).to.be.exist() + expect(err).to.have.property('code', 'CONNECTION_FAILED') + done() + }) + }) + + it('should emit a `closed` event when closed', (done) => { + switchB.handle('/closed/1.0.0', () => { }) + + const connFSM = switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err) => { + expect(err).to.not.exist() + expect(switchA.muxedConns).to.have.property(switchB._peerInfo.id.toB58String()) + connFSM.close() + }) + + connFSM.once('close', () => { + expect(switchA.muxedConns).to.not.have.any.keys([ + switchB._peerInfo.id.toB58String() + ]) + done() + }) + }) + + it('should close when the receiver closes', (done) => { + const peerIdA = switchA._peerInfo.id.toB58String() + + // wait for the expects to happen + expect(2).checks(done) + + switchB.handle('/closed/1.0.0', () => { }) + switchB.on('peer-mux-established', (peerInfo) => { + if (peerInfo.id.toB58String() === peerIdA) { + switchB.removeAllListeners('peer-mux-established') + expect(switchB.muxedConns).to.have.property(peerIdA).mark() + switchB.muxedConns[peerIdA].close() + } + }) + + const connFSM = switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', () => { }) + connFSM.once('close', () => { + expect(switchA.muxedConns).to.not.have.any.keys([ + switchB._peerInfo.id.toB58String() + ]).mark() + }) + }) +}) diff --git a/test/node.js b/test/node.js index 9f624d6189..534f7e0b2e 100644 --- a/test/node.js +++ b/test/node.js @@ -1,5 +1,7 @@ 'use strict' +require('./connection.node') +require('./dial-fsm.node') require('./pnet.node') require('./transports.node') require('./stream-muxers.node') diff --git a/test/swarm-no-muxing.node.js b/test/swarm-no-muxing.node.js index 77e3a5c082..9fbda728ca 100644 --- a/test/swarm-no-muxing.node.js +++ b/test/swarm-no-muxing.node.js @@ -48,7 +48,7 @@ describe('Switch (no Stream Multiplexing)', () => { it('handle a protocol', (done) => { switchB.handle('/bananas/1.0.0', (protocol, conn) => pull(conn, conn)) - expect(Object.keys(switchB.protocols).length).to.equal(2) + expect(switchB.protocols).to.have.all.keys('/bananas/1.0.0') done() }) diff --git a/test/transports.node.js b/test/transports.node.js index e0ed175295..98347f1a52 100644 --- a/test/transports.node.js +++ b/test/transports.node.js @@ -44,6 +44,25 @@ describe('transports', () => { }) }) + it('.transport.remove', () => { + switchA.transport.add('test', new t.C()) + expect(switchA.transports).to.have.any.keys(['test']) + switchA.transport.remove('test') + expect(switchA.transports).to.not.have.any.keys(['test']) + // verify remove fails silently + switchA.transport.remove('test') + }) + + it('.transport.removeAll', (done) => { + switchA.transport.add('test', new t.C()) + switchA.transport.add('test2', new t.C()) + expect(switchA.transports).to.have.any.keys(['test', 'test2']) + switchA.transport.removeAll(() => { + expect(switchA.transports).to.not.have.any.keys(['test', 'test2']) + done() + }) + }) + it('.transport.add', () => { switchA.transport.add(t.n, new t.C()) expect(Object.keys(switchA.transports).length).to.equal(1)