Skip to content

Commit

Permalink
start updating
Browse files Browse the repository at this point in the history
  • Loading branch information
joehand committed Sep 1, 2017
1 parent 61f5cf9 commit 63da70b
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 272 deletions.
115 changes: 6 additions & 109 deletions cli.js
Original file line number Diff line number Diff line change
@@ -1,116 +1,13 @@
#!/usr/bin/env node

var args = require('minimist')(process.argv.slice(2), {
boolean: ['debug']
default: {
dir: process.cwd()
}
})

if (!args._[0]) {
console.error('Usage: dat-push <server-key> --dir=directory')
console.error('Please specify a <server-key>')
process.exit(1)
}

var DatPush = require('.')
var logger = require('status-logger')
var chalk = require('chalk')
var prettyBytes = require('pretty-bytes')
var prettyHrtime = require('pretty-hrtime')

var destKeys = args._
var dir = args.dir || process.cwd()
var datPush = DatPush({dir: dir})

var pending = []
var timers = {}
var connections = 0
var longestKey = 0
var datInfo = ['Reading Dat directory...']
var serverInfo = [pendingConnMsg(destKeys.length)]
var progressInfo = []
var log = logger([datInfo, serverInfo, progressInfo], {debug: args.debug})
log.print()
setInterval(function () {
log.print()
}, 200)

destKeys.forEach(function (serverKey, i) {
if (serverKey.length > longestKey) longestKey = serverKey.length
pending.push(serverKey)
timers[serverKey] = process.hrtime()
return pushServer(serverKey)
})

function pushServer (serverKey) {
datPush.push(serverKey, function (err) {
if (err) {
console.log(err)
process.exit(1)
}
pending.splice(pending.indexOf(serverKey), 1)
if (!pending.length) {
progressInfo.push('\nPush Complete')
log.print()
process.exit(0)
}
})
}
datPush.once('dat-open', function () {
if (!datPush.dat.resume) datInfo[0] = 'No dat in directory, creating a new dat. This could take some time.'
})

datPush.on('connect', function (key) {
connections++
var msg = `Connected: ${connections} server${connections > 1 ? 's' : ''}`
if (destKeys.length > connections) msg += ' | ' + pendingConnMsg(destKeys.length - connections)
serverInfo[0] = msg
})

datPush.once('replication-ready', function (key) {
datInfo[0] = 'Pushing Dat'
datInfo[1] = ` Key: ${datPush.dat.archive.key.toString('hex')}`
datInfo[2] = ` Size: ${prettyBytes(datPush.dat.archive.content.bytes)}`
datInfo[3] = '' // padding
DatPush(args.dir, args._, function (err) {
if (err) throw err
console.log('push done, maybe?')
})

datPush.on('replicating', function (key) {
var index = destKeys.indexOf(key)
progressInfo[index] = chalk.blue.bold(key)
})

datPush.on('progress', function (key, remote, total) {
var percent = remote / total
var index = destKeys.indexOf(key)
var msg = percent === 1 ? chalk.green.bold(key) : chalk.blue.bold(key)
var spacer = Array(longestKey - key.length + 3).join(' ')
msg += spacer + progressBar(percent) + ' ' + Math.round(percent * 100) + '%'
msg += ' ' + prettyHrtime(process.hrtime(timers[key]))
progressInfo[index] = msg
})

datPush.once('error', function (err) {
console.error(err)
process.exit(1)
})

function progressBar (percent) {
var width = 30
var cap = '>'
var ends = ['[', ']']
var spacer = Array(width).join(' ')
var progressVal = ''
var val = Math.round(percent * width)

if (isFinite(val) && val > 0) {
progressVal = Array(val).join('=')
progressVal += cap
}
progressVal += spacer
progressVal = progressVal.substring(0, width)

if (percent < 1) return ends[0] + chalk.blue(progressVal) + ends[1]
return ends[0] + chalk.green(progressVal) + ends[1]
}

function pendingConnMsg (val) {
return `Waiting for connection to ${val} server${val > 1 ? 's' : ''}`
}
145 changes: 65 additions & 80 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,98 +1,83 @@
var events = require('events')
var util = require('util')
var Dat = require('dat-js')
var network = require('peer-network')()
var pump = require('pump')
var assert = require('assert')
var dns = require('dns')
var Dat = require('dat-node')
var debug = require('debug')('dat-push')

module.exports = DatPush

function DatPush (opts) {
if (!(this instanceof DatPush)) return new DatPush(opts)
if (!opts.dir) throw new Error('Directory required')
events.EventEmitter.call(this)

var self = this
self.dir = opts.dir
self.dat = Dat({dir: self.dir, discovery: false, watchFiles: false})
self._replicatingServers = []
}

util.inherits(DatPush, events.EventEmitter)
module.exports = function (datPath, pushTo, cb) {
assert.equal(typeof datPath, 'string', 'dat-push: string path required')
if (typeof pushTo === 'function') {
cb = pushTo
pushTo = null
}
assert.equal(typeof cb, 'function', 'dat-push: callback required')
debug('dir', datPath)

DatPush.prototype.push = function (key, cb) {
if (!cb) cb = function (err) { err && self.emit('error', err) }
if (!key) return cb(new Error('must specify key'))
// if (self._servers[key]) return Error('?')
if (!pushTo) return push([])

var self = this
self.datOpen = false
var serverStatus = {
replicating: false,
connected: false
}
var archive
var stream = network.connect(key)
var whitelist = []
doLookup()

stream.once('connect', function (err) {
if (err) return cb(err)
serverStatus.connected = true
self.emit('connect', key)
})
function doLookup (cb) {
var domain = pushTo.pop()
if (!domain) return push(whitelist)

if (self.datOpen) replicate()
else {
self.dat.open(function (err) {
debug('dns lookup', domain)
dns.lookup(domain, function (err, address) {
if (err) return cb(err)
self.emit('dat-open')
run()
whitelist.push(address)
debug('resolved', domain, 'to', address)
doLookup()
})
}

function run () {
archive = self.dat.archive
self.datOpen = true
self.dat.share(replicate)
}
function push (whitelist) {
Dat(datPath, {createIfMissing: false}, function (err, dat) {
if (err) return cb(err)
var stats = dat.trackStats()
var activePeers = 0

function replicate () {
self.emit('replication-ready')
if (!serverStatus.connected) return stream.once('connect', replicate)
dat.importFiles(function (err) {
if (err) return cb(err)
})

serverStatus.replicating = true
self._replicatingServers.push(key)
self.emit('replicating', key)
dat.joinNetwork({
stream: replicate,
whitelist: whitelist
}).on('listening', function () {
debug('joined network')
}).on('connection', function (conn, info) {
debug('new connection', info.host)
activePeers++
})

stream.write(archive.key)
pump(stream, archive.replicate(), stream, function (err) {
if (err) return cb(err)
})
remoteProgress(archive.content)
function replicate (peer) {
var stream = dat.archive.replicate({live: false})
stream.on('error', function (err) {
debug('replicate err', err)
activePeers--
})
stream.on('close', function () {
debug('stream close')
debug('peer count:', stats.peers)
activePeers--
var peers = stats.peers
if (peers.total === peers.complete) return done()
})
return stream
}

function remoteProgress (feed, interval) {
if (!interval) interval = 200
var remoteBlocks = 0
function done () {
debug('done()', activePeers)
if (activePeers > 0) return // TODO: why getting -1?
var peers = stats.peers
if (peers.total !== peers.complete) return

var it = setInterval(function () {
remoteBlocks = update()
self.emit('progress', key, remoteBlocks, feed.blocks)
if (remoteBlocks === feed.blocks) {
stream.end()
clearInterval(it)
self._replicatingServers.splice(self._replicatingServers.indexOf(key), 1)
self.emit('upload-finished', key)
return cb(null)
}
}, interval)
// TODO: check there are no pending connections
// getting multiple closes
if (dat._closed) return

function update () {
var have = 0
var peer = feed.peers[self._replicatingServers.indexOf(key)] // TODO: less hacky way to get correct peer
if (!peer) return 0
for (var j = 0; j < feed.blocks; j++) {
if (peer.remoteBitfield.get(j)) have++
}
return have
dat.close(cb)
}
}
})
}
}
10 changes: 3 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,10 @@
"author": "Joe Hand <[email protected]> (https://joeahand.com/)",
"license": "MIT",
"dependencies": {
"chalk": "^1.1.3",
"dat-js": "^4.0.0",
"dat-node": "^3.5.3",
"debug": "^3.0.1",
"minimist": "^1.2.0",
"peer-network": "^1.1.0",
"pretty-bytes": "^4.0.2",
"pretty-hrtime": "^1.0.2",
"pump": "^1.0.1",
"status-logger": "^3.0.0"
"pump": "^1.0.1"
},
"devDependencies": {},
"repository": {
Expand Down
Loading

0 comments on commit 63da70b

Please sign in to comment.