Skip to content
This repository has been archived by the owner on Dec 10, 2020. It is now read-only.

Commit

Permalink
Merge pull request #133 from ethereumjs/improve-sync-reliability
Browse files Browse the repository at this point in the history
Improve sync reliability
  • Loading branch information
holgerd77 authored Jun 19, 2020
2 parents a9eb167 + 3672296 commit 9b32bd3
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 12 deletions.
26 changes: 26 additions & 0 deletions lib/net/peerpool.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class PeerPool extends EventEmitter {
this.logger = options.logger
this.maxPeers = options.maxPeers
this.pool = new Map()
this.noPeerPeriods = 0
this.init()
}

Expand All @@ -63,6 +64,7 @@ class PeerPool extends EventEmitter {
server.on('disconnected', (peer) => { this.disconnected(peer) })
})
this.opened = true
this._statusCheckInterval = setInterval(await this._statusCheck.bind(this), 20000)
}

/**
Expand All @@ -72,6 +74,7 @@ class PeerPool extends EventEmitter {
async close () {
this.pool.clear()
this.opened = false
clearInterval(this._statusCheckInterval)
}

/**
Expand Down Expand Up @@ -185,6 +188,29 @@ class PeerPool extends EventEmitter {
}
}
}

/**
* Peer pool status check on a repeated interval
*/
async _statusCheck () {
if (this.size === 0) {
this.noPeerPeriods += 1
if (this.noPeerPeriods >= 3) {
const promises = this.servers.map(async server => {
if (server.bootstrap) {
this.logger.info('Retriggering bootstrap.')
await server.bootstrap()
}
})
await Promise.all(promises)
this.noPeerPeriods = 0
} else {
this.logger.info('Looking for suited peers...')
}
} else {
this.noPeerPeriods = 0
}
}
}

module.exports = PeerPool
2 changes: 1 addition & 1 deletion lib/net/protocol/ethprotocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class EthProtocol extends Protocol {
* Create eth protocol
* @param {Object} options constructor parameters
* @param {Chain} options.chain blockchain
* @param {number} [options.timeout=5000] handshake timeout in ms
* @param {number} [options.timeout=8000] handshake timeout in ms
* @param {Logger} [options.logger] logger instance
*/
constructor (options) {
Expand Down
2 changes: 1 addition & 1 deletion lib/net/protocol/lesprotocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class LesProtocol extends Protocol {
* @param {Chain} options.chain blockchain
* @param {FlowControl} [options.flow] flow control manager. if undefined,
* header serving will be disabled
* @param {number} [options.timeout=5000] handshake timeout in ms
* @param {number} [options.timeout=8000] handshake timeout in ms
* @param {Logger} [options.logger] logger instance
*/
constructor (options) {
Expand Down
4 changes: 2 additions & 2 deletions lib/net/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { defaultLogger } = require('../../logging')

const defaultOptions = {
logger: defaultLogger,
timeout: 5000
timeout: 8000
}

/**
Expand All @@ -28,7 +28,7 @@ class Protocol extends EventEmitter {
/**
* Create new protocol
* @param {Object} options constructor parameters
* @param {number} [options.timeout=5000] handshake timeout in ms
* @param {number} [options.timeout=8000] handshake timeout in ms
* @param {Logger} [options.logger] logger instance
*/
constructor (options) {
Expand Down
15 changes: 12 additions & 3 deletions lib/net/server/rlpxserver.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,25 @@ class RlpxServer extends Server {
this.initDpt()
this.initRlpx()

this.bootnodes.map(node => {
await this.bootstrap()

this.started = true
}

/**
* Bootstrap bootnode peers from the network
* @return {Promise}
*/
async bootstrap () {
const promises = this.bootnodes.map(node => {
const bootnode = {
address: node.ip,
udpPort: node.port,
tcpPort: node.port
}
return this.dpt.bootstrap(bootnode).catch(e => this.error(e))
})

this.started = true
await Promise.all(promises)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/service/ethereumservice.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const defaultOptions = {
lightserv: false,
common: new Common('mainnet', 'chainstart'),
minPeers: 3,
timeout: 5000,
timeout: 8000,
interval: 1000
}

Expand Down
5 changes: 4 additions & 1 deletion lib/service/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,13 @@ class Service extends EventEmitter {
}

/**
* Start service
* Stop service
* @return {Promise}
*/
async stop () {
if (this.opened) {
await this.close()
}
this.running = false
this.logger.info(`Stopped ${this.name} service.`)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/sync/fetcher/fetcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { defaultLogger } = require('../../logging')
const defaultOptions = {
common: new Common('mainnet', 'chainstart'),
logger: defaultLogger,
timeout: 5000,
timeout: 8000,
interval: 1000,
banTime: 60000,
maxQueue: 16,
Expand Down
7 changes: 5 additions & 2 deletions test/blockchain/chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ tape('[Chain]', t => {
t.equal(chain.genesis.hash.toString('hex'),
chain.blocks.latest.hash().toString('hex'),
'get chain.block.latest')
chain.close()
await chain.close()
t.end()
})

Expand Down Expand Up @@ -80,6 +80,7 @@ tape('[Chain]', t => {
await chain.getTd(block.hash())
t.ok(chain.opened, 'chain should open if getTd() called')
t.equal(await chain.open(), false, 'skip open if already opened')
await chain.close()
t.end()
})

Expand All @@ -89,6 +90,7 @@ tape('[Chain]', t => {
t.notOk(await chain.putBlocks(), 'add undefined block')
t.notOk(await chain.putBlocks(null), 'add null block')
t.notOk(await chain.putBlocks([]), 'add empty block list')
await chain.close()
t.end()
})

Expand All @@ -98,6 +100,7 @@ tape('[Chain]', t => {
t.notOk(await chain.putHeaders(), 'add undefined header')
t.notOk(await chain.putHeaders(null), 'add null header')
t.notOk(await chain.putHeaders([]), 'add empty header list')
await chain.close()
t.end()
})

Expand All @@ -112,7 +115,7 @@ tape('[Chain]', t => {
await chain.putBlocks([block])
t.equal(chain.blocks.td.toString(16), '4abcdffff', 'get chain.td')
t.equal(chain.blocks.height.toString(10), '1', 'get chain.height')
chain.close()
await chain.close()
t.end()
})
})
1 change: 1 addition & 0 deletions test/integration/peerpool.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ tape('[Integration:PeerPool]', async (t) => {

async function destroy (server, pool) {
await server.stop()
await pool.close()
}

t.test('should open', async (t) => {
Expand Down
2 changes: 2 additions & 0 deletions test/service/fastethereumservice.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ tape('[FastEthereumService]', t => {
service.pool.emit('added', 'peer0')
service.pool.emit('removed', 'peer0')
service.pool.emit('error', 'error1')
await service.close()
})

t.test('should start/stop', async (t) => {
Expand All @@ -69,6 +70,7 @@ tape('[FastEthereumService]', t => {
td.verify(service.synchronizer.stop())
td.verify(server.start())
t.notOk(await service.stop(), 'already stopped')
await server.stop()
t.end()
})

Expand Down

0 comments on commit 9b32bd3

Please sign in to comment.