From 98522b7479f3225cd0960d9555748609992c3bf9 Mon Sep 17 00:00:00 2001 From: Paul Frazee Date: Tue, 12 Feb 2019 13:43:31 -0600 Subject: [PATCH] Close connection after timeout --- index.js | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/index.js b/index.js index 352eb03..a1c92da 100755 --- a/index.js +++ b/index.js @@ -3,6 +3,8 @@ var dns = require('dns') var Dat = require('dat-node') var debug = require('debug')('dat-push') +const WANT_TIMEOUT = 5e3 + module.exports = function (datPath, pushTo, cb) { assert.equal(typeof datPath, 'string', 'dat-push: string path required') if (typeof pushTo === 'function') { @@ -15,7 +17,7 @@ module.exports = function (datPath, pushTo, cb) { if (!pushTo) return push([]) var whitelist = [] - doLookup() + doLookup(push) function doLookup (cb) { var domain = pushTo.pop() @@ -26,7 +28,7 @@ module.exports = function (datPath, pushTo, cb) { if (err) return cb(err) whitelist.push(address) debug('resolved', domain, 'to', address) - doLookup() + doLookup(cb) }) } @@ -40,10 +42,12 @@ module.exports = function (datPath, pushTo, cb) { if (err) return cb(err) }) + console.log('Joining network...') dat.joinNetwork({ stream: replicate, whitelist: whitelist }).on('listening', function () { + console.log('Searching for targets...') debug('joined network') }).on('connection', function (conn, info) { debug('new connection', info.host) @@ -52,17 +56,40 @@ module.exports = function (datPath, pushTo, cb) { function replicate (peer) { var stream = dat.archive.replicate({live: false}) - stream.on('error', function (err) { - debug('replicate err', err) - activePeers-- - }) - stream.on('close', function () { + console.log('Replicating with', peer.host) + + const onClose = () => { + console.log('Finished replicating with', peer.host) debug('stream close') debug('peer count:', stats.peers) activePeers-- var peers = stats.peers if (peers.total === peers.complete) return done() + } + + // HACK + // close the stream if no want messages are received for a period + // we will assume that means that sync is finished + // that's not always the case (connection could have just died) + // see https://github.com/joehand/dat-push/issues/9 + // -prf + const endStream = () => { + debug(`no want-msg received in ${WANT_TIMEOUT}ms, closing stream`) + stream.finalize() + onClose() // must call this manually, event doesnt emit + } + const startTheClock = () => setTimeout(endStream, WANT_TIMEOUT) + var to = startTheClock() + stream.on('want', () => { + clearTimeout(to) + to = startTheClock() + }) + + stream.on('error', function (err) { + debug('replicate err', err) + activePeers-- }) + stream.on('close', onClose) return stream }