From 5c46bc345849682fecf50d7cbdaaff9ade27bafe Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Sat, 27 Oct 2018 23:46:48 +0100 Subject: [PATCH] test: add ipns pubsub tests --- package.json | 4 +- test/name-pubsub.js | 376 +++++++++++++++++++++++++++++++++++++++++ test/node.js | 1 + test/pubsub.js | 17 +- test/utils/wait-for.js | 24 +++ 5 files changed, 404 insertions(+), 18 deletions(-) create mode 100644 test/name-pubsub.js create mode 100644 test/utils/wait-for.js diff --git a/package.json b/package.json index dfd0e391..241bf317 100644 --- a/package.json +++ b/package.json @@ -51,8 +51,8 @@ "form-data": "^2.3.2", "go-ipfs-dep": "~0.4.17", "hat": "0.0.3", - "ipfs": "~0.33.0-rc.2", - "ipfs-api": "^25.0.0", + "ipfs": "ipfs/js-ipfs#feat/ipns-over-pubsub", + "ipfs-api": "ipfs/js-ipfs-api#master", "ipfs-unixfs": "~0.1.15", "ipfsd-ctl": "~0.39.1", "left-pad": "^1.3.0", diff --git a/test/name-pubsub.js b/test/name-pubsub.js new file mode 100644 index 00000000..523421a3 --- /dev/null +++ b/test/name-pubsub.js @@ -0,0 +1,376 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const { fromB58String } = require('multihashes') + +const parallel = require('async/parallel') +const retry = require('async/retry') +const series = require('async/series') + +const IPFS = require('ipfs') +const DaemonFactory = require('ipfsd-ctl') + +const waitFor = require('./utils/wait-for') + +const config = { + Addresses: { + API: '/ip4/0.0.0.0/tcp/0', + Gateway: '/ip4/0.0.0.0/tcp/0', + Swarm: [] + }, + Bootstrap: [] +} + +const spawnJsDaemon = (callback) => { + // DaemonFactory.create({ type: 'js' }) + DaemonFactory.create({ type: 'proc', exec: IPFS }) // TODO change after debuggin + .spawn({ + disposable: true, + initOptions: { bits: 512 }, + args: ['--enable-namesys-pubsub'], // enable ipns over pubsub + config + }, callback) +} + +const spawnGoDaemon = (callback) => { + DaemonFactory.create() + .spawn({ + disposable: true, + initOptions: { bits: 1024 }, + args: ['--enable-namesys-pubsub'], + config + }, callback) +} + +const subscribeToReceiveByPubsub = (nodeA, nodeB, nodeAId, callback) => { + const topic = `/ipns/${fromB58String(nodeAId.id).toString()}` + let subscribed = false + + nodeB.api.name.resolve(nodeAId.id, (err) => { + expect(err).to.exist() + + function checkMessage(msg) { + subscribed = true + } + + series([ + (cb) => waitForPeerToSubscribe(nodeB.api, cb), + (cb) => nodeB.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => nodeA.api.name.publish(ipfsRef, { resolve: false }, cb), + (cb) => nodeA.api.name.resolve(nodeAId.id, cb), + /* (cb) => waitFor(() => subscribed === true, cb), */ (cb) => setTimeout(() => cb(), 10000), + (cb) => nodeB.api.name.resolve(nodeAId.id, cb) + ], (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + + expect(res[2].name).to.equal(nodeAId.id) // Published to Node A ID + expect(res[3].path || res[3]).to.equal(ipfsRef) // TODO: remove path once not using proc daemon + expect(res[5].path || res[5]).to.equal(ipfsRef) + + callback() + }) + }) +} + +const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU' + +describe.only('name-pubsub', () => { + describe('js nodes', () => { + let nodeAId + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the timeout + this.timeout(80 * 1000) + + series([ + (cb) => spawnJsDaemon(cb), + (cb) => spawnJsDaemon(cb), + ], (err, daemons) => { + expect(err).to.not.exist() + nodes = daemons + done() + }) + }) + + // Get node id + before(function (done) { + nodes[0].api.id((err, res) => { + expect(err).to.not.exist() + expect(res.id).to.exist() + + nodeAId = res + nodes[1].api.swarm.connect(res.addresses[0], done) + }) + }) + + after(function (done) { + this.timeout(60 * 1000) + parallel(nodes.map((node) => (cb) => node.stop(cb)), done) + }) + + it('should get enabled state of pubsub', function (done) { + nodes[0].api.name.pubsub.state((err, state) => { + expect(err).to.not.exist() + expect(state).to.exist() + expect(state.enabled).to.equal(true) + + done() + }) + }) + + it('should publish the received record to a js node subscriber', function (done) { + this.timeout(300 * 1000) + + subscribeToReceiveByPubsub(nodes[0], nodes[1], nodeAId, done) + }) + }) + + describe('go nodes', () => { + let nodeAId + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the timeout + this.timeout(80 * 1000) + + series([ + (cb) => spawnGoDaemon(cb), + (cb) => spawnGoDaemon(cb), + ], (err, daemons) => { + expect(err).to.not.exist() + nodes = daemons + + done() + }) + }) + + // Connect nodes + before(function (done) { + nodes[0].api.id((err, res) => { + expect(err).to.not.exist() + expect(res.id).to.exist() + + nodeAId = res + nodes[1].api.swarm.connect(res.addresses[0], done) + }) + }) + + after(function (done) { + this.timeout(60 * 1000) + parallel(nodes.map((node) => (cb) => node.stop(cb)), done) + }) + + it('should get enabled state of pubsub', function (done) { + nodes[0].api.name.pubsub.state((err, state) => { + expect(err).to.not.exist() + expect(state).to.exist() + expect(state.enabled).to.equal(true) + + done() + }) + }) + + it('should publish the received record to a go node subscriber', function (done) { + this.timeout(300 * 1000) + + subscribeToReceiveByPubsub(nodes[0], nodes[1], nodeAId, done) + }) + }) + + describe.skip('bybrid nodes', () => { + let nodeAId + let nodeBId + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the timeout + this.timeout(80 * 1000) + + series([ + (cb) => spawnGoDaemon(cb), + (cb) => spawnJsDaemon(cb), + ], (err, daemons) => { + expect(err).to.not.exist() + nodes = daemons + + done() + }) + }) + + // Get node ids + before(function (done) { + this.timeout(60 * 1000) + + parallel([ + (cb) => nodes[0].api.id(cb), + (cb) => nodes[1].api.id(cb) + ], (err, ids) => { + expect(err).to.not.exist() + expect(ids).to.exist() + expect(ids[0].id).to.exist() + expect(ids[1].id).to.exist() + + nodeAId = ids[0] + nodeBId = ids[1] + + nodes[1].api.swarm.connect(ids[0].addresses[0], done) + }) + }) + + after(function (done) { + this.timeout(60 * 1000) + parallel(nodes.map((node) => (cb) => node.stop(cb)), done) + }) + + it('should get enabled state of pubsub', function (done) { + nodes[0].api.name.pubsub.state((err, state) => { + expect(err).to.not.exist() + expect(state).to.exist() + expect(state.enabled).to.equal(true) + + done() + }) + }) + + it('should publish the received record to a go node and a js subscriber should receive it', function (done) { + this.timeout(250 * 1000) + const topic = `/ipns/${fromB58String(nodeAId.id).toString()}` + let subscribed = false + + nodes[1].api.name.resolve(nodeAId.id, (err) => { + expect(err).to.exist() + + function checkMessage(msg) { + console.log('msg received', msg) + subscribed = true + } + + series([ + (cb) => waitForPeerToSubscribe(nodes[1].api, cb), + (cb) => nodes[1].api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => nodes[0].api.name.publish(ipfsRef, { resolve: false }, cb), + (cb) => nodes[0].api.name.resolve(nodeAId.id, cb), + // (cb) => waitFor(() => subscribed === true, 50 * 1000, cb), + (cb) => setTimeout(() => cb(), 10000), + (cb) => nodes[1].api.name.resolve(nodeAId.id, cb) + ], (err, res) => { + console.log('res', res) + console.log('err', err) + expect(err).to.not.exist() + expect(res).to.exist() + + expect(res[2].name).to.equal(nodeAId.id) // Published to Node A ID + expect(res[3].path).to.equal(ipfsRef) + expect(res[5].path).to.equal(ipfsRef) + done() + }) + }) + }) + + it('should publish the received record to a js node and a go subscriber should receive it', function (done) { + this.timeout(350 * 1000) + const topic = `/ipns/${fromB58String(nodeBId.id).toString()}` + let subscribed = false + + nodes[0].api.name.resolve(nodeBId.id, (err, res) => { + expect(err).to.exist() + + function checkMessage(msg) { + console.log('msg received') + subscribed = true + } + + series([ + (cb) => waitForPeerToSubscribe(nodes[0].api, cb), + // (cb) => nodes[0].api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => nodes[1].api.name.publish(ipfsRef, { resolve: false }, cb), + (cb) => setTimeout(cb, 30000), + (cb) => nodes[1].api.name.resolve(nodeBId.id, cb), + // (cb) => waitFor(() => subscribed === true, 50 * 1000, cb), + (cb) => nodes[0].api.name.resolve(nodeBId.id, cb) + ], (err, res) => { + console.log('res', res) + console.log('err', err) + expect(err).to.not.exist() + expect(res).to.exist() + + expect(res[2].name).to.equal(nodeBId.id) // Published to Node A ID + expect(res[3]).to.equal(ipfsRef) + expect(res[5]).to.equal(ipfsRef) + done() + }) + }) + }) + }) +}) + +// Wait until a peer subscribes a topic +const waitForPeerToSubscribe = (daemon, callback) => { + retry({ + times: 5, + interval: 2000 + }, (next) => { + daemon.name.pubsub.subs((error, res) => { + if (error) { + return next(error) + } + + if (!res || !res.length) { + return next(new Error('Could not find subscription')) + } + + return next(null, res[0]) + }) + }, callback) +} + +/* +// Wait until a peer subscribes a topic +const waitForPeerToSubscribe2 = (daemon, callback) => { + retry({ + times: 5, + interval: 2000 + }, (next) => { + daemon.pubsub.ls((error, res) => { + if (error) { + return next(error) + } + + if (!res || !res.length) { + return next(new Error('Could not find subscription')) + } + + return next(null, res[0]) + }) + }, callback) +} + +const waitForPeerKnowingOfTheSubscription = (daemon, subscription, callback) => { + retry({ + times: 5, + interval: 2000 + }, (next) => { + daemon.pubsub.peers(subscription, (error, res) => { + if (error) { + return next(error) + } + + if (!res || !res.length) { + return next(new Error('Peer did not subscribe')) + } + + return next(null, res[0]) + }) + }, callback) +} +*/ diff --git a/test/node.js b/test/node.js index a83c4c90..a57e77b2 100644 --- a/test/node.js +++ b/test/node.js @@ -8,3 +8,4 @@ require('./exchange-files') require('./kad-dht') require('./pin') require('./files') +require('./name-pubsub') diff --git a/test/pubsub.js b/test/pubsub.js index 58049d4a..378d649b 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -13,22 +13,7 @@ const auto = require('async/auto') const DaemonFactory = require('ipfsd-ctl') -/* - * Wait for a condition to become true. When its true, callback is called. - */ -function waitFor (predicate, callback) { - const ttl = Date.now() + (10 * 1000) - const self = setInterval(() => { - if (predicate()) { - clearInterval(self) - return callback() - } - if (Date.now() > ttl) { - clearInterval(self) - return callback(new Error('waitFor time expired')) - } - }, 500) -} +const waitFor = require('./utils/wait-for') const connect = (jsD, goD, callback) => { parallel([ diff --git a/test/utils/wait-for.js b/test/utils/wait-for.js new file mode 100644 index 00000000..2b4a10d7 --- /dev/null +++ b/test/utils/wait-for.js @@ -0,0 +1,24 @@ +'use strict' + +/* + * Wait for a condition to become true. When its true, callback is called. + */ +module.exports = (predicate, ttl, callback) => { + if (typeof ttl === 'function') { + callback = ttl + ttl = Date.now() + (10 * 1000) + } else { + ttl = Date.now() + ttl + } + + const self = setInterval(() => { + if (predicate()) { + clearInterval(self) + return callback() + } + if (Date.now() > ttl) { + clearInterval(self) + return callback(new Error('waitFor time expired')) + } + }, 500) +}