Skip to content

Commit

Permalink
feat: make switch a state machine (libp2p#278)
Browse files Browse the repository at this point in the history
* feat: add basic state machine functionality to switch

* feat: make connections state machines

* refactor: clean up logs

* feat: add dialFSM to the switch

* feat: add better support for closing connections

* test: add tests for some uncovered lines

* feat: add warning emitter for muxer upgrade failed

* docs: update readme
  • Loading branch information
jacobheun authored Oct 19, 2018
1 parent e3b4b42 commit 7dcabdd
Show file tree
Hide file tree
Showing 21 changed files with 1,714 additions and 668 deletions.
142 changes: 87 additions & 55 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca
- [Usage](#usage)
- [Create a libp2p switch](#create-a-libp2p-switch)
- [API](#api)
- [`switch.dial(peer, protocol, callback)`](#swarmdialpi-protocol-callback)
- [`switch.hangUp(peer, callback)`](#swarmhanguppi-callback)
- [`switch.handle(protocol, handler)`](#swarmhandleprotocol-handler)
- [`switch.unhandle(protocol)`](#swarmunhandleprotocol)
- [`switch.start(callback)`](#swarmlistencallback)
- [`switch.stop(callback)`](#swarmclosecallback)
- [`switch.connection`](#connection)
- [`switch.connection`](#switchconnection)
- [`switch.dial(peer, protocol, callback)`](#switchdialpeer-protocol-callback)
- [`switch.dialFSM(peer, protocol, callback)`](#switchdialfsmpeer-protocol-callback)
- [`switch.handle(protocol, handlerFunc, matchFunc)`](#switchhandleprotocol-handlerfunc-matchfunc)
- [`switch.hangUp(peer, callback)`](#switchhanguppeer-callback)
- [`switch.start(callback)`](#switchstartcallback)
- [`switch.stop(callback)`](#switchstopcallback)
- [`switch.stats`](#stats-api)
- [Internal Transports API](#transports)
- [Design Notes](#designnotes)
- [`switch.unhandle(protocol)`](#switchunhandleprotocol)
- [Internal Transports API](#internal-transports-api)
- [Design Notes](#design-notes)
- [Multitransport](#multitransport)
- [Connection upgrades](#connection-upgrades)
- [Identify](#identify)
Expand Down Expand Up @@ -86,6 +87,46 @@ tests]([./test/pnet.node.js]).
- peerInfo is a [PeerInfo](https://github.com/libp2p/js-peer-info) object that has the peer information.
- peerBook is a [PeerBook](https://github.com/libp2p/js-peer-book) object that stores all the known peers.

### `switch.connection`

##### `switch.connection.addUpgrade()`

A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification.

> **WIP**

##### `switch.connection.addStreamMuxer(muxer)`

Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/libp2p/interface-stream-muxer) spec.

- `muxer`

##### `switch.connection.reuse()`

Enable the identify protocol.

##### `switch.connection.crypto([tag, encrypt])`

Enable a specified crypto protocol. By default no encryption is used, aka `plaintext`. If called with no arguments it resets to use `plaintext`.

You can use for example [libp2p-secio](https://github.com/libp2p/js-libp2p-secio) like this

```js
const secio = require('libp2p-secio')
switch.connection.crypto(secio.tag, secio.encrypt)
```

##### `switch.connection.enableCircuitRelay(options, callback)`

Enable circuit relaying.

- `options`
- enabled - activates relay dialing and listening functionality
- hop - an object with two properties
- enabled - enables circuit relaying
- active - is it an active or passive relay (default false)
- `callback`

### `switch.dial(peer, protocol, callback)`

dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point where we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, then do nothing.
Expand All @@ -94,13 +135,24 @@ dial uses the best transport (whatever works first, in the future we can have so
- `protocol`
- `callback`

### `switch.hangUp(peer, callback)`
### `switch.dialFSM(peer, protocol, callback)`

Hang up the muxed connection we have with the peer.
works like dial, but calls back with a [Connection State Machine](#connection-state-machine)

- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `callback`
- `protocol`: String that defines the protocol (e.g '/ipfs/bitswap/1.1.0') to be used
- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine)

#### Connection State Machine
Connection state machines emit a number of events that can be used to determine the current state of the connection
and to received the underlying connection that can be used to transfer data.

##### Events
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
- `error:connection_attempt_failed`: emitted whenever a dial attempt fails for a given transport. An array of errors is emitted.
- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/interface-connection) will be emitted.
- `close`: emitted when the connection has closed.

### `switch.handle(protocol, handlerFunc, matchFunc)`

Expand All @@ -110,68 +162,43 @@ Handle a new protocol.
- `handlerFunc` - function called when we receive a dial on `protocol. Signature must be `function (protocol, conn) {}`
- `matchFunc` - matchFunc for multistream-select
### `switch.unhandle(protocol)`

Unhandle a protocol.

- `protocol`

### `switch.on('peer-mux-established', (peer) => {})`

- `peer`: is instance of [PeerInfo][] that has info of the peer we have just established a muxed connection with.

### `switch.on('peer-mux-closed', (peer) => {})`

- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection.

### `switch.start(callback)`

Start listening on all added transports that are available on the current `peerInfo`.

### `switch.stop(callback)`
### `switch.hangUp(peer, callback)`

Close all the listeners and muxers.
Hang up the muxed connection we have with the peer.

- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `callback`

### `switch.connection`
### `switch.on('error', (err) => {})`

##### `switch.connection.addUpgrade()`
Emitted when the switch encounters an error.

A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification.
- `err`: instance of [Error][]

> **WIP**
### `switch.on('peer-mux-established', (peer) => {})`

##### `switch.connection.addStreamMuxer(muxer)`
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just established a muxed connection with.

Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/libp2p/interface-stream-muxer) spec.
### `switch.on('peer-mux-closed', (peer) => {})`

- `muxer`
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection.

##### `switch.connection.reuse()`
### `switch.on('start', () => {})`

Enable the identify protocol.
Emitted when the switch has successfully started.

##### `switch.connection.crypto([tag, encrypt])`
### `switch.on('stop', () => {})`

Enable a specified crypto protocol. By default no encryption is used, aka `plaintext`. If called with no arguments it resets to use `plaintext`.
Emitted when the switch has successfully stopped.

You can use for example [libp2p-secio](https://github.com/libp2p/js-libp2p-secio) like this
### `switch.start(callback)`

```js
const secio = require('libp2p-secio')
switch.connection.crypto(secio.tag, secio.encrypt)
```
Start listening on all added transports that are available on the current `peerInfo`.

##### `switch.connection.enableCircuitRelay(options, callback)`
### `switch.stop(callback)`

Enable circuit relaying.
Close all the listeners and muxers.

- `options`
- enabled - activates relay dialing and listening functionality
- hop - an object with two properties
- enabled - enables circuit relaying
- active - is it an active or passive relay (default false)
- `callback`

### Stats API
Expand Down Expand Up @@ -278,6 +305,11 @@ Each one of these values is [an exponential moving-average instance](https://git

Stats are not updated in real-time. Instead, measurements are buffered and stats are updated at an interval. The maximum interval can be defined through the `Switch` constructor option `stats.computeThrottleTimeout`, defined in miliseconds.

### `switch.unhandle(protocol)`

Unhandle a protocol.

- `protocol`

### Internal Transports API

Expand Down
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"devDependencies": {
"aegir": "^15.1.0",
"chai": "^4.1.2",
"chai-checkmark": "^1.0.1",
"dirty-chai": "^2.0.1",
"libp2p-mplex": "~0.8.2",
"libp2p-pnet": "~0.1.0",
Expand All @@ -54,7 +55,10 @@
"dependencies": {
"async": "^2.6.1",
"big.js": "^5.1.2",
"class-is": "^1.1.0",
"debug": "^3.1.0",
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"hashlru": "^2.2.1",
"interface-connection": "~0.3.2",
"ip-address": "^5.8.9",
Expand Down
103 changes: 103 additions & 0 deletions src/connection/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
'use strict'

const EventEmitter = require('events').EventEmitter
const debug = require('debug')
const withIs = require('class-is')

class BaseConnection extends EventEmitter {
constructor ({ _switch, name }) {
super()

this.switch = _switch
this.ourPeerInfo = this.switch._peerInfo
this.log = debug(`libp2p:conn:${name}`)
}

/**
* Gets the current state of the connection
*
* @returns {string} The current state of the connection
*/
getState () {
return this._state._state
}

/**
* Puts the state into encrypting mode
*
* @returns {void}
*/
encrypt () {
this._state('encrypt')
}

/**
* Puts the state into privatizing mode
*
* @returns {void}
*/
protect () {
this._state('privatize')
}

/**
* Puts the state into muxing mode
*
* @returns {void}
*/
upgrade () {
this._state('upgrade')
}

/**
* Event handler for disconnected.
*
* @fires BaseConnection#close
* @returns {void}
*/
_onDisconnected () {
this.log(`disconnected from ${this.theirB58Id}`)
this.emit('close')
this.removeAllListeners()
}

/**
* Event handler for privatized
*
* @fires BaseConnection#private
* @returns {void}
*/
_onPrivatized () {
this.log(`successfully privatized incoming connection`)
this.emit('private', this.conn)
}

/**
* Wraps this.conn with the Switch.protector for private connections
*
* @private
* @fires ConnectionFSM#error
* @returns {void}
*/
_onPrivatizing () {
if (!this.switch.protector) {
return this._state('done')
}

this.conn = this.switch.protector.protect(this.conn, (err) => {
if (err) {
this.emit('error', err)
return this._state('disconnect')
}

this.log(`successfully privatized conn to ${this.theirB58Id}`)
this.conn.setPeerInfo(this.theirPeerInfo)
this._state('done')
})
}
}

module.exports = withIs(BaseConnection, {
className: 'BaseConnection',
symbolName: 'libp2p-switch/BaseConnection'
})
46 changes: 46 additions & 0 deletions src/connection/handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const debug = require('debug')
const IncomingConnection = require('./incoming')
const observeConn = require('../observe-connection')

function listener (_switch) {
const log = debug(`libp2p:switch:listener`)

/**
* Takes a transport key and returns a connection handler function
*
* @param {string} transportKey The key of the transport to handle connections for
* @param {function} handler A custom handler to use
* @returns {function(Connection)} A connection handler function
*/
return (transportKey, handler) => {
/**
* Takes a base connection and manages listening behavior
*
* @param {Connection} conn The connection to manage
* @returns {void}
*/
return (conn) => {
// Add a transport level observer, if needed
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn

log('received incoming connection')
const connFSM = new IncomingConnection({ connection, _switch, transportKey })

connFSM.once('error', (err) => log(err))
connFSM.once('private', (_conn) => {
// Use the custom handler, if it was provided
if (handler) {
return handler(_conn)
}
connFSM.encrypt()
})
connFSM.once('encrypted', () => connFSM.upgrade())

connFSM.protect()
}
}
}

module.exports = listener
Loading

0 comments on commit 7dcabdd

Please sign in to comment.