Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: ability to override default bitswap max message size #152

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d1a13f2
feature: ability to override default bitswap max message size
ya7ya Sep 27, 2017
bd50ee3
setting a minimum msg size, tests for go-interop and variable msg siz…
ya7ya Oct 12, 2017
339503d
chore: update deps
daviddias Oct 24, 2017
751d436
fix: add missing multicodec dependency (#155)
pgte Nov 8, 2017
fa797a0
chore: update deps
daviddias Nov 8, 2017
1a62f49
chore: update contributors
daviddias Nov 8, 2017
ab69719
chore: release version v0.17.3
daviddias Nov 8, 2017
a8b1e07
feat: windows interop (#154)
richardschneider Nov 10, 2017
1eea363
chore: update contributors
daviddias Nov 10, 2017
91aaedc
chore: release version v0.17.4
daviddias Nov 10, 2017
f2c8ea2
chore: update deps
daviddias Nov 23, 2017
6886a59
chore: updating CI files (#157)
victorb Nov 27, 2017
17e15d0
feat: stats improvements (#158)
pgte Nov 27, 2017
519e2a9
test: port go tests (#159)
pgte Dec 15, 2017
095f467
chore: update contributors
daviddias Dec 15, 2017
349b3bc
chore: release version v0.18.0
daviddias Dec 15, 2017
8e91def
fix: getMany: ensuring we set the want list (#162)
pgte Jan 28, 2018
ff978d0
feat: per-peer stats (#166)
pgte Feb 6, 2018
b349085
feat: added getMany performance tests (#164)
pgte Feb 6, 2018
89b263a
chore: update deps
daviddias Feb 6, 2018
1fc73b4
chore: update contributors
daviddias Feb 6, 2018
f090590
chore: release version v0.18.1
daviddias Feb 6, 2018
37f19cf
feature: ability to override default bitswap max message size
ya7ya Sep 27, 2017
114c4cf
setting a minimum msg size, tests for go-interop and variable msg siz…
ya7ya Oct 12, 2017
f9d21c4
Merge branch 'feat/message-size' of github.com:ya7ya/js-ipfs-bitswap …
ya7ya Feb 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const logger = require('../utils').logger
const MAX_MESSAGE_SIZE = 512 * 1024

class DecisionEngine {
constructor (peerId, blockstore, network) {
constructor (peerId, blockstore, network, options) {
this._log = logger(peerId, 'engine')
this.blockstore = blockstore
this.network = network
Expand All @@ -33,6 +33,13 @@ class DecisionEngine {
// List of tasks to be processed
this._tasks = []

options = options || {}
this._maxMessageSize = options.maxMessageSize || MAX_MESSAGE_SIZE
// override Max size to 1kb bytes by minimum.
if (this._maxMessageSize < 1024) {
this._maxMessageSize = 1024
}

this._outbox = debounce(this._processTasks.bind(this), 100)
}

Expand All @@ -42,7 +49,7 @@ class DecisionEngine {
return acc + b.data.byteLength
}, 0)

if (total < MAX_MESSAGE_SIZE) {
if (total < this._maxMessageSize) {
return this._sendSafeBlocks(peer, blocks, cb)
}

Expand All @@ -55,7 +62,7 @@ class DecisionEngine {
batch.push(b)
size += b.data.byteLength

if (size >= MAX_MESSAGE_SIZE ||
if (size >= this._maxMessageSize ||
// need to ensure the last remaining items get sent
outstanding === 0) {
const nextBatch = batch.slice()
Expand Down
6 changes: 3 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ const logger = require('./utils').logger
* @param {Blockstore} blockstore
*/
class Bitswap {
constructor (libp2p, blockstore) {
constructor (libp2p, blockstore, options) {
this._libp2p = libp2p
this._log = logger(this.peerInfo.id)

options = options || {}
// the network delivers messages
this.network = new Network(libp2p, this)

// local database
this.blockstore = blockstore

this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network)
this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, options)

// handle message sending
this.wm = new WantManager(this.peerInfo.id, this.network)
Expand Down
69 changes: 57 additions & 12 deletions test/bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ const createLibp2pNode = require('./utils/create-libp2p-node')
const makeBlock = require('./utils/make-block')
const orderedFinish = require('./utils/helpers').orderedFinish

const MAX_MESSAGE_SIZE = 512 * 1024

// Creates a repo + libp2pNode + Bitswap with or without DHT
function createThing (dht, callback) {
function createThing (dht, msgSize, callback) {
if (!callback) {
callback = msgSize
msgSize = MAX_MESSAGE_SIZE
}

waterfall([
(cb) => createTempRepo(cb),
(repo, cb) => {
Expand All @@ -28,7 +35,7 @@ function createThing (dht, callback) {
}, (err, node) => cb(err, repo, node))
},
(repo, libp2pNode, cb) => {
const bitswap = new Bitswap(libp2pNode, repo.blocks)
const bitswap = new Bitswap(libp2pNode, repo.blocks, {maxMessageSize: msgSize})
bitswap.start((err) => cb(err, repo, libp2pNode, bitswap))
}
], (err, repo, libp2pNode, bitswap) => {
Expand All @@ -49,12 +56,13 @@ describe('bitswap without DHT', function () {

before((done) => {
parallel([
(cb) => createThing(false, cb),
(cb) => createThing(false, cb),
(cb) => createThing(false, cb)
(cb) => createThing(false, MAX_MESSAGE_SIZE, cb),
(cb) => createThing(false, MAX_MESSAGE_SIZE, cb),
(cb) => createThing(false, MAX_MESSAGE_SIZE, cb),
(cb) => createThing(false, 32, cb)
], (err, results) => {
expect(err).to.not.exist()
expect(results).to.have.length(3)
expect(results).to.have.length(4)
nodes = results
done()
})
Expand All @@ -77,6 +85,13 @@ describe('bitswap without DHT', function () {
], done)
})

it('connect 1 -> 3 && 2 -> 3', (done) => {
parallel([
(cb) => nodes[1].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb),
(cb) => nodes[2].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb)
], done)
})

it('put a block in 2, fail to get it in 0', (done) => {
const finish = orderedFinish(2, done)

Expand Down Expand Up @@ -104,12 +119,13 @@ describe('bitswap with DHT', () => {

before((done) => {
parallel([
(cb) => createThing(true, cb),
(cb) => createThing(true, cb),
(cb) => createThing(true, cb)
(cb) => createThing(true, MAX_MESSAGE_SIZE, cb),
(cb) => createThing(true, MAX_MESSAGE_SIZE, cb),
(cb) => createThing(true, MAX_MESSAGE_SIZE, cb),
(cb) => createThing(true, 32, cb)
], (err, results) => {
expect(err).to.not.exist()
expect(results).to.have.length(3)
expect(results).to.have.length(4)
nodes = results
done()
})
Expand All @@ -125,10 +141,11 @@ describe('bitswap with DHT', () => {
}, done)
})

it('connect 0 -> 1 && 1 -> 2', (done) => {
it('connect 0 -> 1 && 1 -> 2 && 2 -> 3', (done) => {
parallel([
(cb) => nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo, cb),
(cb) => nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo, cb)
(cb) => nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo, cb),
(cb) => nodes[2].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb)
], done)
})

Expand All @@ -145,4 +162,32 @@ describe('bitswap with DHT', () => {
})
], done)
})

it('put a block in 2, get it in 3', (done) => {
waterfall([
(cb) => makeBlock(cb),
(block, cb) => nodes[2].bitswap.put(block, () => cb(null, block)),
(block, cb) => setTimeout(() => cb(null, block), 400),
(block, cb) => nodes[3].bitswap.get(block.cid, (err, blockRetrieved) => {
expect(err).to.not.exist()
expect(block.data).to.eql(blockRetrieved.data)
expect(block.cid).to.eql(blockRetrieved.cid)
cb()
})
], done)
})

it('put a block in 3, get it in 2', (done) => {
waterfall([
(cb) => makeBlock(cb),
(block, cb) => nodes[3].bitswap.put(block, () => cb(null, block)),
(block, cb) => setTimeout(() => cb(null, block), 400),
(block, cb) => nodes[2].bitswap.get(block.cid, (err, blockRetrieved) => {
expect(err).to.not.exist()
expect(block.data).to.eql(blockRetrieved.data)
expect(block.cid).to.eql(blockRetrieved.cid)
cb()
})
], done)
})
})
2 changes: 2 additions & 0 deletions test/types/message.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ describe('BitswapMessage', () => {
expect(err).to.not.exist()
// TODO
// check the deserialised message
expect(message.serializeToBitswap110()).to.eql(rawMessageFullWantlist)
done()
})
})
Expand All @@ -275,6 +276,7 @@ describe('BitswapMessage', () => {
expect(err).to.not.exist()
// TODO
// check the deserialised message
expect(message.serializeToBitswap110()).to.eql(rawMessageOneBlock)
done()
})
})
Expand Down