Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
Issue 189 | Add WebsocketProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-rowland committed Mar 24, 2018
1 parent 9b937c0 commit 822db54
Show file tree
Hide file tree
Showing 9 changed files with 9,522 additions and 7,506 deletions.
6,045 changes: 3,262 additions & 2,783 deletions dist/ProviderEngine.js

Large diffs are not rendered by default.

10,807 changes: 6,098 additions & 4,709 deletions dist/ZeroClientProvider.js

Large diffs are not rendered by default.

24 changes: 19 additions & 5 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
const ProviderEngine = require('./index.js')
const ZeroClientProvider = require('./zero.js')
const FetchProvider = require('./subproviders/fetch')

const fetchEngine = ZeroClientProvider({
getAccounts: function(){},
dataSubprovider: new FetchProvider({
rpcUrl: 'https://mainnet.infura.io',
})
})

// create engine
const engine = ZeroClientProvider({
const websocketEngine = ZeroClientProvider({
getAccounts: function(){},
rpcUrl: 'https://mainnet.infura.io/',
rpcUrl: 'wss://mainnet.infura.io/_ws',
//debug: true,
})

// log new blocks
engine.on('block', function(block){
console.log('BLOCK CHANGED:', '#'+block.number.toString('hex'), '0x'+block.hash.toString('hex'))
})
websocketEngine.on('block', function(block) {
console.log('WS:', '#'+block.number.toString('hex'), '0x'+block.hash.toString('hex'))
})

// log new blocks
fetchEngine.on('block', function(block) {
console.log('Fetch:', '#'+block.number.toString('hex'), '0x'+block.hash.toString('hex'))
})
9 changes: 6 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
"test": "node test/index.js",
"bundle-zero": "browserify -s ZeroClientProvider -e zero.js > dist/ZeroClientProvider.js",
"bundle-engine": "browserify -s ProviderEngine -e index.js > dist/ProviderEngine.js",
"bundle": "mkdir -p ./dist && npm run bundle-engine && npm run bundle-zero"
"bundle": "cross-env mkdir -p ./dist && npm run bundle-engine && npm run bundle-zero"
},
"author": "",
"license": "ISC",
"dependencies": {
"async": "^2.5.0",
"clone": "^2.0.0",
"eth-sig-util": "^1.4.2",
"eth-block-tracker": "^2.2.2",
"eth-sig-util": "^1.4.2",
"ethereumjs-block": "^1.2.2",
"ethereumjs-tx": "^1.2.0",
"ethereumjs-util": "^5.1.1",
Expand All @@ -30,15 +30,18 @@
"semaphore": "^1.0.3",
"solc": "^0.4.2",
"tape": "^4.4.0",
"ws": "^5.1.0",
"xhr": "^2.2.0",
"xtend": "^4.0.1"
},
"devDependencies": {
"babel-preset-es2015": "^6.24.1",
"babel-preset-stage-0": "^6.24.1",
"backoff": "^2.5.0",
"browserify": "^14.0.0"
},
"browser": {
"request": false
"request": false,
"ws": "./util/ws.js"
}
}
1 change: 1 addition & 0 deletions subproviders/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ RpcSource.prototype.handleRequest = function (payload, next, end) {

// overwrite id to not conflict with other concurrent users
const newPayload = createPayload(payload)
newPayload.jsonrpc = '2.0'
// remove extra parameter from request
delete newPayload.origin

Expand Down
130 changes: 130 additions & 0 deletions subproviders/websocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
const Backoff = require('backoff')
const createPayload = require('../util/create-payload')
const inherits = require('util').inherits
const Subprovider = require('./subprovider')
const WebSocket = require('ws')

class WebsocketProvider extends Subprovider {
constructor({ rpcUrl, debug }) {
super()

Object.defineProperties(this, {
_backoff: {
value: Backoff.exponential({
randomisationFactor: 0.2,
maxDelay: 5000
})
},
_connectTime: {
value: null,
writable: true
},
_log: {
value: debug
? (...args) => console.info.apply(console, ['[WSProvider]', ...args])
: () => { }
},
_pendingRequests: {
value: new Map()
},
_socket: {
value: null,
writable: true
},
_unhandledRequests: {
value: []
},
_url: {
value: rpcUrl
}
})

this._handleSocketClose = this._handleSocketClose.bind(this)
this._handleSocketMessage = this._handleSocketMessage.bind(this)
this._handleSocketOpen = this._handleSocketOpen.bind(this)

// Called when a backoff timeout has finished. Time to try reconnecting.
this._backoff.on('ready', () => {
this._openSocket()
})

this._openSocket()
}

handleRequest(payload, next, end) {
if (!this._socket || this._socket.readyState !== 1) {
this._unhandledRequests.push(Array.from(arguments))
this._log('Socket not open. Request queued.')
return;
}

this._pendingRequests.set(payload.id, Array.from(arguments))

const newPayload = createPayload(payload)
delete newPayload.origin

this._socket.send(JSON.stringify(newPayload))
this._log('Request sent:', newPayload.method)
}

_handleSocketClose({ reason, code }) {
this._log(`Socket closed, code ${code} (${reason || 'no reason'})`);
// If the socket has been open for longer than 5 seconds, reset the backoff
if (this._connectTime && Date.now() - this._connectTime > 5000) {
this._backoff.reset()
}

this._socket.removeEventListener('close', this._handleSocketClose)
this._socket.removeEventListener('message', this._handleSocketMessage)
this._socket.removeEventListener('open', this._handleSocketOpen)

this._socket = null
this._backoff.backoff()
}

_handleSocketMessage(message) {
let data;

try {
data = JSON.parse(message.data)
} catch (e) {
this._log('Received a message that is not valid JSON:', message)
return;
}

this._log('Received message ID:', data.id)

if (!this._pendingRequests.has(data.id)) {
return
}

const [payload, next, end] = this._pendingRequests.get(data.id)
this._pendingRequests.delete(data.id)
end(data.error && data.error.message, data.result)
}

_handleSocketOpen() {
this._log('Socket open.')
this._connectTime = Date.now();

// Any pending requests need to be resent because our session was lost
// and will not get responses for them in our new session.
this._pendingRequests.forEach(value => this._unhandledRequests.push(value))
this._pendingRequests.clear()

const unhandledRequests = this._unhandledRequests.splice(0, this._unhandledRequests.length)
unhandledRequests.forEach(request => {
this.handleRequest.apply(this, request)
})
}

_openSocket() {
this._log('Opening socket...')
this._socket = new WebSocket(this._url)
this._socket.addEventListener('close', this._handleSocketClose)
this._socket.addEventListener('message', this._handleSocketMessage)
this._socket.addEventListener('open', this._handleSocketOpen)
}
}

module.exports = WebsocketProvider
3 changes: 1 addition & 2 deletions util/create-payload.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ const extend = require('xtend')
module.exports = createPayload


function createPayload(data){
function createPayload(data) {
return extend({
// defaults
id: getRandomId(),
jsonrpc: '2.0',
params: [],
// user-specified
}, data)
Expand Down
1 change: 1 addition & 0 deletions util/ws.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = WebSocket;
8 changes: 4 additions & 4 deletions zero.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const InflightCacheSubprovider = require('./subproviders/inflight-cache')
const HookedWalletSubprovider = require('./subproviders/hooked-wallet.js')
const SanitizingSubprovider = require('./subproviders/sanitizer.js')
const RpcSubprovider = require('./subproviders/rpc.js')
const FetchSubprovider = require('./subproviders/fetch.js')
const WebsocketSubprovider = require('./subproviders/websocket.js')


module.exports = ZeroClientProvider
Expand Down Expand Up @@ -67,9 +67,9 @@ function ZeroClientProvider(opts){
engine.addProvider(idmgmtSubprovider)

// data source
const dataSubprovider = opts.dataSubprovider || new FetchSubprovider({
rpcUrl: opts.rpcUrl || 'https://mainnet.infura.io/',
originHttpHeaderKey: opts.originHttpHeaderKey,
const dataSubprovider = opts.dataSubprovider || new WebsocketSubprovider({
rpcUrl: opts.rpcUrl || 'wss://mainnet.infura.io/_ws',
debug: opts.debug,
})
engine.addProvider(dataSubprovider)

Expand Down

0 comments on commit 822db54

Please sign in to comment.