From 3fe2b06f8e790fe37be9a13fc66c6c1dc10700a0 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 8 Jul 2019 13:08:18 +0100 Subject: [PATCH 1/8] feat: pubsub --- API.md | 204 +++++++++++++++++- README.md | 10 +- package.json | 1 + src/add/index.js | 11 +- src/bitswap/wantlist.js | 10 +- src/block/put.js | 11 +- src/index.js | 7 + src/lib/querystring.js | 12 ++ src/ls.js | 6 +- src/ping.js | 11 +- src/pubsub/ls.js | 18 ++ src/pubsub/peers.js | 24 +++ src/pubsub/publish.js | 43 ++++ src/pubsub/subscribe.js | 58 +++++ src/pubsub/subscription-tracker.js | 52 +++++ src/pubsub/unsubscribe.js | 9 + src/swarm/connect.js | 7 +- src/swarm/peers.js | 21 +- test/interface.spec.js | 30 +-- test/utils/core-adapter/add-from-url.js | 6 +- test/utils/core-adapter/add.js | 6 +- test/utils/core-adapter/bitswap/stat.js | 5 +- test/utils/core-adapter/bitswap/wantlist.js | 6 +- test/utils/core-adapter/block/get.js | 5 +- test/utils/core-adapter/block/put.js | 5 +- test/utils/core-adapter/index.js | 25 ++- test/utils/core-adapter/ls/index.js | 5 +- test/utils/core-adapter/pubsub/handler-map.js | 1 + test/utils/core-adapter/pubsub/subscribe.js | 12 ++ test/utils/core-adapter/pubsub/unsubscribe.js | 9 + 30 files changed, 522 insertions(+), 108 deletions(-) create mode 100644 src/lib/querystring.js create mode 100644 src/pubsub/ls.js create mode 100644 src/pubsub/peers.js create mode 100644 src/pubsub/publish.js create mode 100644 src/pubsub/subscribe.js create mode 100644 src/pubsub/subscription-tracker.js create mode 100644 src/pubsub/unsubscribe.js create mode 100644 test/utils/core-adapter/pubsub/handler-map.js create mode 100644 test/utils/core-adapter/pubsub/subscribe.js create mode 100644 test/utils/core-adapter/pubsub/unsubscribe.js diff --git a/API.md b/API.md index a41163c..0c614ab 100644 --- a/API.md +++ b/API.md @@ -66,11 +66,11 @@ * pin.rm * [ping](#ping) TODO: add docs * [pingPullStream](#pingpullstream) TODO: add docs -* pubsub.publish -* pubsub.ls -* pubsub.peers -* pubsub.subscribe -* pubsub.unsubscribe +* [pubsub.ls](#pubsubls) +* [pubsub.peers](#pubsubpeers) +* [pubsub.publish](#pubsubpublish) +* [pubsub.subscribe](#pubsubsubscribe) +* [pubsub.unsubscribe](#pubsubunsubscribe) * refs * refsPullStream * refs.local @@ -561,3 +561,197 @@ console.log(data.toString('utf8')) hello world! */ ``` + +## pubsub.ls + +List subscribed topics by name. + +### `pubsub.ls([options]): Promise` + +#### Parameters + +* `options` (optional) + * Type: `Object` + * Default: `null` +* `options.signal` (optional) - A signal that can be used to abort the request + * Type: [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) + * Default: `null` + +#### Returns + +An array of subscribed topic names. + +* Type: `Promise` + +#### Examples + +```js +const res = await ipfs.pubsub.ls() +console.log(res) +/* +[ 'my-pubsub-topic' ] +*/ +``` + +## pubsub.peers + +List peers we are currently pubsubbing with, optionally filtered by topic name. + +### `pubsub.peers([topic], [options]): Promise` + +#### Parameters + +* `topic` (optional) - Pubsub topic name to filter peer list by + * Type: `String` + * Default: `null` +* `options` (optional) + * Type: `Object` + * Default: `null` +* `options.signal` (optional) - A signal that can be used to abort the request + * Type: [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) + * Default: `null` + +#### Returns + +An array of string peer IDs. + +* Type: `Promise` + +#### Examples + +```js +const res = await ipfs.pubsub.peers() +console.log(res) +/* +[ 'QmPefeutipT4odZHRyBE3xBcWQxmBxZqS7n5zQxKZP9TNp' ] +*/ +``` + +## pubsub.publish + +Publish a message to a given pubsub topic. + +### `pubsub.publish(topic, message, [options]): Promise` + +#### Parameters + +* `topic` - Pubsub topic name to publish the topic to + * Type: `String` +* `message` - Message to publish + * Type: `Buffer`/`ArrayBuffer`/`String` +* `options` (optional) + * Type: `Object` + * Default: `null` +* `options.signal` (optional) - A signal that can be used to abort the request + * Type: [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) + * Default: `null` + +#### Returns + +`Promise` resolved when the message has been published. + +* Type: `Promise` + +#### Examples + +```js +await ipfs.pubsub.publish('my-pubsub-topic', Buffer.from('hello world!')) +``` + +## pubsub.subscribe + +Subscribe to messages on a given topic. + +**Note that in the browser there is a per-domain open request limit (6 for most browsers)** + +### `pubsub.subscribe(topic, handler, [options]): Promise` + +#### Parameters + +* `topic` - Pubsub topic name to subscribe to messages for + * Type: `String` +* `handler` - A function called every time this node receives a message for the given topic. + * Type: `Function(msg)`. Message properties: + * `from` - Peer ID of the peer this message came from + * Type: `Buffer` + * `data` - Raw message data + * Type: `Buffer` + * `seqno` - 20 byte random message number + * Type: `Buffer` + * `topicIDs` - Topic names this message was published to + * Type: `String[]` +* `options` (optional) + * Type: `Object` + * Default: `null` +* `options.discover` (optional) - Try to discover other peers subscribed to the same topic + * Type: `Boolean` + * Deafult: `false` +* `options.onError` (optional) - An error handler called when the request errors or parsing of a given message fails. It is passed two parameters, the error that occurred and a boolean indicating if it was a fatal error or not (fatal errors terminate the subscription). + * Type: `Function(err, fatal)` + * Default: `null` +* `options.signal` (optional) - A signal that can be used to abort the request + * Type: [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) + * Default: `null` + +#### Returns + +`Promise` resolved when initial subscription has been set up. + +* Type: `Promise` + +#### Examples + +```js +await ipfs.pubsub.subscribe('my-pubsub-topic', msg => { + console.log(msg) + console.log('data: ', msg.data.toString()) +}) +/* +{ + from: , + data: , + seqno: , + topicIDs: [ 'my-pubsub-topic' ] +} +data: hi +*/ +``` + +## pubsub.unsubscribe + +Stop receiving messages for a given topic. + +### `pubsub.unsubscribe(topic, [handler], [options]): Promise` + +#### Parameters + +* `topic` - Pubsub topic name to unsubscribe from. + * Type: `String` +* `handler` (optional) - The handler function currently registered for this topic. If not provided, **all** handlers for the passed topic will be unsubscribed. Note this only works using the Promise API. + * Type: `Function` + * Default: `null` +* `options` (optional) + * Type: `Object` + * Default: `null` +* `options.signal` (optional) - A signal that can be used to abort the request + * Type: [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) + * Default: `null` + +#### Returns + +`Promise` resolved when topic has been unsubscribed. + +* Type: `Promise` + +#### Examples + +```js +const handler = msg => console.log(msg) +await ipfs.pubsub.unsubscribe('my-pubsub-topic', handler) +``` + +Unsubscribe all handlers: + +```js +await ipfs.pubsub.unsubscribe('my-pubsub-topic') +``` diff --git a/README.md b/README.md index 91e490f..1e50e1b 100644 --- a/README.md +++ b/README.md @@ -204,11 +204,11 @@ This module is in heavy development, not all API methods are available (or docum * pin.rm * [ping](./API.md#ping) TODO: add docs * [pingPullStream](./API.md#pingpullstream) TODO: add docs -* pubsub.publish -* pubsub.ls -* pubsub.peers -* pubsub.subscribe -* pubsub.unsubscribe +* [pubsub.ls](./API.md#pubsubls) +* [pubsub.peers](./API.md#pubsubpeers) +* [pubsub.publish](./API.md#pubsubpublish) +* [pubsub.subscribe](./API.md#pubsubsubscribe) +* [pubsub.unsubscribe](./API.md#pubsubunsubscribe) * refs * refsPullStream * refs.local diff --git a/package.json b/package.json index b7cd68c..6a9a229 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "coverage": "npx nyc -r html npm run test:node -- --bail" }, "dependencies": { + "abort-controller": "^3.0.0", "async-iterator-to-pull-stream": "^1.3.0", "buffer": "^5.2.1", "cids": "^0.7.1", diff --git a/src/add/index.js b/src/add/index.js index 6ef9688..76c3f6e 100644 --- a/src/add/index.js +++ b/src/add/index.js @@ -1,7 +1,7 @@ 'use strict' const ndjson = require('iterable-ndjson') -const QueryString = require('querystring') +const { objectToQuery } = require('../lib/querystring') const configure = require('../lib/configure') const { ok, toIterable } = require('../lib/fetch') const { toFormData } = require('./form-data') @@ -11,7 +11,7 @@ module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { return (input, options) => (async function * () { options = options || {} - const qs = Object.entries({ + const qs = objectToQuery({ 'stream-channels': true, chunker: options.chunker, 'cid-version': options.cidVersion, @@ -28,12 +28,9 @@ module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { silent: options.silent, trickle: options.trickle, 'wrap-with-directory': options.wrapWithDirectory - }).reduce((obj, [key, value]) => { - if (value != null) obj[key] = value - return obj - }, {}) + }) - const url = `${apiUrl}${apiPath}/add?${QueryString.stringify(qs)}` + const url = `${apiUrl}${apiPath}/add${qs}` const res = await ok(fetch(url, { method: 'POST', signal: options.signal, diff --git a/src/bitswap/wantlist.js b/src/bitswap/wantlist.js index 94817d6..21ee8ac 100644 --- a/src/bitswap/wantlist.js +++ b/src/bitswap/wantlist.js @@ -1,6 +1,6 @@ 'use strict' -const QueryString = require('querystring') +const { objectToQuery } = require('../lib/querystring') const configure = require('../lib/configure') const { ok } = require('../lib/fetch') @@ -13,13 +13,7 @@ module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { options = options || {} - const qs = {} - - if (peerId) { - qs.peer = peerId - } - - const url = `${apiUrl}${apiPath}/bitswap/wantlist?${QueryString.stringify(qs)}` + const url = `${apiUrl}${apiPath}/bitswap/wantlist${objectToQuery({ peer: peerId })}` const res = await ok(fetch(url, { signal: options.signal, headers: options.headers || headers diff --git a/src/block/put.js b/src/block/put.js index bc05f99..2698396 100644 --- a/src/block/put.js +++ b/src/block/put.js @@ -1,7 +1,7 @@ 'use strict' -const QueryString = require('querystring') const FormData = require('form-data') +const { objectToQuery } = require('../lib/querystring') const configure = require('../lib/configure') const { ok } = require('../lib/fetch') const toCamel = require('../lib/to-camel') @@ -10,17 +10,14 @@ module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { const put = async (data, options) => { options = options || {} - const qs = Object.entries({ + const qs = objectToQuery({ format: options.format, mhtype: options.mhtype, mhlen: options.mhlen, pin: options.pin - }).reduce((obj, [key, value]) => { - if (value != null) obj[key] = value - return obj - }, {}) + }) - const url = `${apiUrl}${apiPath}/block/put?${QueryString.stringify(qs)}` + const url = `${apiUrl}${apiPath}/block/put${qs}` const body = new FormData() body.append('file', data) diff --git a/src/index.js b/src/index.js index 0308af8..5ab5aff 100644 --- a/src/index.js +++ b/src/index.js @@ -30,6 +30,13 @@ module.exports = config => { id: callbackify(require('./id')(config)), ping: callbackify(collectify(ping)), pingPullStream: pullify.source(ping), + pubsub: { + ls: callbackify(require('./pubsub/ls')(config)), + peers: callbackify(require('./pubsub/peers')(config)), + publish: callbackify(require('./pubsub/publish')(config)), + subscribe: callbackify(require('./pubsub/subscribe')(config), { minArgs: 2 }), + unsubscribe: callbackify(require('./pubsub/unsubscribe')(config), { minArgs: 2 }) + }, swarm: { connect: callbackify(require('./swarm/connect')(config)), peers: callbackify(require('./swarm/peers')(config)) diff --git a/src/lib/querystring.js b/src/lib/querystring.js new file mode 100644 index 0000000..c074ad2 --- /dev/null +++ b/src/lib/querystring.js @@ -0,0 +1,12 @@ +const QueryString = require('querystring') + +// Convert an object to a query string INCLUDING leading ? +// Excludes null/undefined values +exports.objectToQuery = obj => { + const qs = Object.entries(obj).reduce((obj, [key, value]) => { + if (value != null) obj[key] = value + return obj + }, {}) + + return Object.keys(qs).length ? `?${QueryString.stringify(qs)}` : '' +} diff --git a/src/ls.js b/src/ls.js index 7337b12..d3107ef 100644 --- a/src/ls.js +++ b/src/ls.js @@ -1,6 +1,6 @@ 'use strict' -const QueryString = require('querystring') +const { objectToQuery } = require('./lib/querystring') const configure = require('./lib/configure') const { ok } = require('./lib/fetch') const toCamel = require('./lib/to-camel') @@ -9,9 +9,7 @@ module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { return (path, options) => (async function * () { options = options || {} - const qs = { arg: path.toString() } - - const url = `${apiUrl}${apiPath}/ls?${QueryString.stringify(qs)}` + const url = `${apiUrl}${apiPath}/ls${objectToQuery({ arg: path.toString() })}` const res = await ok(fetch(url, { signal: options.signal, headers: options.headers || headers diff --git a/src/ping.js b/src/ping.js index 49e298a..442b6d0 100644 --- a/src/ping.js +++ b/src/ping.js @@ -1,7 +1,7 @@ 'use strict' const ndjson = require('iterable-ndjson') -const QueryString = require('querystring') +const { objectToQuery } = require('./lib/querystring') const configure = require('./lib/configure') const { ok, toIterable } = require('./lib/fetch') const toCamel = require('./lib/to-camel') @@ -10,13 +10,8 @@ module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { return (peerId, options) => (async function * () { options = options || {} - const qs = { arg: peerId } - - if (options.count != null) { - qs.count = options.count - } - - const url = `${apiUrl}${apiPath}/ping?${QueryString.stringify(qs)}` + const qs = objectToQuery({ arg: peerId, count: options.count }) + const url = `${apiUrl}${apiPath}/ping${qs}` const res = await ok(fetch(url, { signal: options.signal, headers: options.headers || headers diff --git a/src/pubsub/ls.js b/src/pubsub/ls.js new file mode 100644 index 0000000..c65c17e --- /dev/null +++ b/src/pubsub/ls.js @@ -0,0 +1,18 @@ +'use strict' + +const configure = require('../lib/configure') +const { ok } = require('../lib/fetch') + +module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { + return async (options) => { + options = options || {} + + const url = `${apiUrl}${apiPath}/pubsub/ls` + const res = await ok(fetch(url, { + signal: options.signal, + headers: options.headers || headers + })) + const data = await res.json() + return data.Strings || [] + } +}) diff --git a/src/pubsub/peers.js b/src/pubsub/peers.js new file mode 100644 index 0000000..f738677 --- /dev/null +++ b/src/pubsub/peers.js @@ -0,0 +1,24 @@ +'use strict' + +const { objectToQuery } = require('../lib/querystring') +const configure = require('../lib/configure') +const { ok } = require('../lib/fetch') + +module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { + return async (topic, options) => { + if (!options && typeof topic === 'object') { + options = topic + topic = null + } + + options = options || {} + + const url = `${apiUrl}${apiPath}/pubsub/peers${objectToQuery({ arg: topic })}` + const res = await ok(fetch(url, { + signal: options.signal, + headers: options.headers || headers + })) + const data = await res.json() + return data.Strings || [] + } +}) diff --git a/src/pubsub/publish.js b/src/pubsub/publish.js new file mode 100644 index 0000000..702f432 --- /dev/null +++ b/src/pubsub/publish.js @@ -0,0 +1,43 @@ +'use strict' + +const { Buffer } = require('buffer') +const configure = require('../lib/configure') +const { ok } = require('../lib/fetch') + +module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { + return async (topic, data, options) => { + options = options || {} + + const url = `${apiUrl}${apiPath}/pubsub/pub?arg=${encodeURIComponent(topic)}&arg=${encodeBuffer(Buffer.from(data))}` + const res = await ok(fetch(url, { + method: 'POST', + signal: options.signal, + headers: options.headers || headers + })) + + return res.text() + } +}) + +function encodeBuffer (buf) { + let uriEncoded = '' + for (const byte of buf) { + // https://tools.ietf.org/html/rfc3986#page-14 + // ALPHA (%41-%5A and %61-%7A), DIGIT (%30-%39), hyphen (%2D), period (%2E), + // underscore (%5F), or tilde (%7E) + if ( + (byte >= 0x41 && byte <= 0x5A) || + (byte >= 0x61 && byte <= 0x7A) || + (byte >= 0x30 && byte <= 0x39) || + (byte === 0x2D) || + (byte === 0x2E) || + (byte === 0x5F) || + (byte === 0x7E) + ) { + uriEncoded += String.fromCharCode(byte) + } else { + uriEncoded += `%${byte.toString(16).padStart(2, '0')}` + } + } + return uriEncoded +} diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js new file mode 100644 index 0000000..765527e --- /dev/null +++ b/src/pubsub/subscribe.js @@ -0,0 +1,58 @@ +'use strict' + +const ndjson = require('iterable-ndjson') +const { objectToQuery } = require('../lib/querystring') +const configure = require('../lib/configure') +const { ok, toIterable } = require('../lib/fetch') +const SubscriptionTracker = require('./subscription-tracker') + +module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { + const subsTracker = SubscriptionTracker.singleton() + + return async (topic, handler, options) => { + options = options || {} + options.signal = subsTracker.subscribe(topic, handler, options.signal) + + const qs = objectToQuery({ arg: topic, discover: options.discover }) + const url = `${apiUrl}${apiPath}/pubsub/sub${qs}` + let res + + try { + res = await ok(fetch(url, { + method: 'POST', + signal: options.signal, + headers: options.headers || headers + })) + } catch (err) { // Initial subscribe fail, ensure we clean up + subsTracker.unsubscribe(topic, handler) + throw err + } + + const onError = options.onError || (err => console.error(err)) + + ;(async () => { + try { + for await (const msg of ndjson(toIterable(res.body))) { + try { + handler({ + from: Buffer.from(msg.from, 'base64'), + data: Buffer.from(msg.data, 'base64'), + seqno: Buffer.from(msg.seqno, 'base64'), + topicIDs: msg.topicIDs + }) + } catch (err) { + onError(err, false) // Not fatal + } + } + } catch (err) { + // FIXME: In testing with Chrome, err.type is undefined (should not be!) + // Temporarily use the name property instead. + if (err.type !== 'aborted' && err.name !== 'AbortError') { + onError(err, true) // Fatal + } + } finally { + subsTracker.unsubscribe(topic, handler) + } + })() + } +}) diff --git a/src/pubsub/subscription-tracker.js b/src/pubsub/subscription-tracker.js new file mode 100644 index 0000000..bbd7c2d --- /dev/null +++ b/src/pubsub/subscription-tracker.js @@ -0,0 +1,52 @@ +'use strict' + +const AbortController = require('abort-controller') + +class SubscriptionTracker { + constructor () { + this._subs = new Map() + } + + static singleton () { + if (SubscriptionTracker.instance) return SubscriptionTracker.instance + SubscriptionTracker.instance = new SubscriptionTracker() + return SubscriptionTracker.instance + } + + subscribe (topic, handler, signal) { + const topicSubs = this._subs.get(topic) || [] + + if (topicSubs.find(s => s.handler === handler)) { + throw new Error(`Already subscribed to ${topic} with this handler`) + } + + // Create controller so a call to unsubscribe can cancel the request + const controller = new AbortController() + + this._subs.set(topic, [{ handler, controller }].concat(topicSubs)) + + // If there is an external signal, forward the abort event + if (signal) { + signal.addEventListener('abort', () => this.unsubscribe(topic, handler)) + } + + return controller.signal + } + + unsubscribe (topic, handler) { + const subs = this._subs.get(topic) || [] + let unsubs + + if (handler) { + this._subs.set(topic, subs.filter(s => s.handler !== handler)) + unsubs = subs.filter(s => s.handler === handler) + } else { + this._subs.set(topic, []) + unsubs = subs + } + + unsubs.forEach(s => s.controller.abort()) + } +} + +module.exports = SubscriptionTracker diff --git a/src/pubsub/unsubscribe.js b/src/pubsub/unsubscribe.js new file mode 100644 index 0000000..3789109 --- /dev/null +++ b/src/pubsub/unsubscribe.js @@ -0,0 +1,9 @@ +'use strict' + +const configure = require('../lib/configure') +const SubscriptionTracker = require('./subscription-tracker') + +module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { + const subsTracker = SubscriptionTracker.singleton() + return async (topic, handler) => subsTracker.unsubscribe(topic, handler) +}) diff --git a/src/swarm/connect.js b/src/swarm/connect.js index 314fcb5..82cab96 100644 --- a/src/swarm/connect.js +++ b/src/swarm/connect.js @@ -1,6 +1,6 @@ 'use strict' -const QueryString = require('querystring') +const { objectToQuery } = require('../lib/querystring') const configure = require('../lib/configure') const { ok } = require('../lib/fetch') @@ -9,9 +9,8 @@ module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { addrs = Array.isArray(addrs) ? addrs : [addrs] options = options || {} - const qs = { arg: addrs.map(a => a.toString()) } - - const url = `${apiUrl}${apiPath}/swarm/connect?${QueryString.stringify(qs)}` + const qs = objectToQuery({ arg: addrs.map(a => a.toString()) }) + const url = `${apiUrl}${apiPath}/swarm/connect${qs}` const res = await ok(fetch(url, { signal: options.signal, headers: options.headers || headers diff --git a/src/swarm/peers.js b/src/swarm/peers.js index 2937e3b..045decd 100644 --- a/src/swarm/peers.js +++ b/src/swarm/peers.js @@ -1,6 +1,6 @@ 'use strict' -const QueryString = require('querystring') +const { objectToQuery } = require('../lib/querystring') const configure = require('../lib/configure') const { ok } = require('../lib/fetch') @@ -8,20 +8,13 @@ module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { return async options => { options = options || {} - const qs = {} - - if (options.verbose) { - qs.verbose = true - } else { - if (options.streams) { - qs.streams = true - } - if (options.latency) { - qs.latency = true - } - } + const qs = objectToQuery({ + verbose: options.verbose, + streams: options.streams, + latency: options.latency + }) - const url = `${apiUrl}${apiPath}/swarm/peers?${QueryString.stringify(qs)}` + const url = `${apiUrl}${apiPath}/swarm/peers${qs}` const res = await ok(fetch(url, { signal: options.signal, headers: options.headers || headers diff --git a/test/interface.spec.js b/test/interface.spec.js index 67536ba..f91b952 100644 --- a/test/interface.spec.js +++ b/test/interface.spec.js @@ -336,20 +336,22 @@ describe('interface-ipfs-core tests', () => { initOptions: { bits: 1024, profile: 'test' } } }), { - // skip: isNode ? [ - // // pubsub.subscribe - // isWindows ? { - // name: 'should send/receive 100 messages', - // reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' - // } : null, - // isWindows ? { - // name: 'should receive multiple messages', - // reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' - // } : null - // ] : { - // reason: 'FIXME pubsub is not supported in the browser https://github.com/ipfs/js-ipfs-http-client/issues/518' - // } - skip: { reason: 'LITE: not implemented yet' } + skip: [ + { + name: 'should error on string messags', + reason: 'LITE: revisit, allowed here' + } + ].concat(isNode ? [] : [ + { + name: 'should subscribe and unsubscribe 10 times', + reason: 'LITE: Max 6 open XHR requests in the browser - https://stackoverflow.com/questions/561046/how-many-concurrent-ajax-xmlhttprequest-requests-are-allowed-in-popular-browse' + }, + { + name: 'should subscribe 10 handlers and unsunscribe once with no reference to the handlers', + reason: 'LITE: Max 6 open XHR requests in the browser - https://stackoverflow.com/questions/561046/how-many-concurrent-ajax-xmlhttprequest-requests-are-allowed-in-popular-browse' + } + ]), + only: true }) tests.repo(defaultCommon, { skip: { reason: 'LITE: not implemented yet' } }) diff --git a/test/utils/core-adapter/add-from-url.js b/test/utils/core-adapter/add-from-url.js index 14b54b4..c1a6642 100644 --- a/test/utils/core-adapter/add-from-url.js +++ b/test/utils/core-adapter/add-from-url.js @@ -1,12 +1,10 @@ 'use strict' -const callbackify = require('../../../src/lib/callbackify') - -module.exports = ipfsLite => callbackify(async (...args) => { +module.exports = ipfsLite => async (...args) => { const res = await ipfsLite.addFromURL(...args) return res.map(({ name, hash, size }) => ({ path: name, hash, size: parseInt(size) })) -}, { minArgs: 1 }) +} diff --git a/test/utils/core-adapter/add.js b/test/utils/core-adapter/add.js index 15b7e5f..2e26176 100644 --- a/test/utils/core-adapter/add.js +++ b/test/utils/core-adapter/add.js @@ -1,12 +1,10 @@ 'use strict' -const callbackify = require('../../../src/lib/callbackify') - -module.exports = ipfsLite => callbackify(async (...args) => { +module.exports = ipfsLite => async (...args) => { const res = await ipfsLite.add(...args) return res.map(({ name, hash, size }) => ({ path: name, hash, size: parseInt(size) })) -}, { minArgs: 1 }) +} diff --git a/test/utils/core-adapter/bitswap/stat.js b/test/utils/core-adapter/bitswap/stat.js index e295d0d..5f88065 100644 --- a/test/utils/core-adapter/bitswap/stat.js +++ b/test/utils/core-adapter/bitswap/stat.js @@ -1,9 +1,8 @@ 'use strict' const Big = require('bignumber.js') -const callbackify = require('../../../../src/lib/callbackify') -module.exports = ipfsLite => callbackify(async () => { +module.exports = ipfsLite => async () => { const stats = await ipfsLite.bitswap.stat() stats.blocksReceived = new Big(stats.blocksReceived) stats.dataReceived = new Big(stats.DataReceived) @@ -12,4 +11,4 @@ module.exports = ipfsLite => callbackify(async () => { stats.dupBlksReceived = new Big(stats.DupBlksReceived) stats.dupDataReceived = new Big(stats.DupDataReceived) return stats -}) +} diff --git a/test/utils/core-adapter/bitswap/wantlist.js b/test/utils/core-adapter/bitswap/wantlist.js index 5aae758..01a77a4 100644 --- a/test/utils/core-adapter/bitswap/wantlist.js +++ b/test/utils/core-adapter/bitswap/wantlist.js @@ -1,8 +1,6 @@ 'use strict' -const callbackify = require('../../../../src/lib/callbackify') - -module.exports = ipfsLite => callbackify(async (...args) => { +module.exports = ipfsLite => async (...args) => { const list = await ipfsLite.bitswap.wantlist(...args) return { Keys: list.map(cid => ({ '/': cid })) } -}) +} diff --git a/test/utils/core-adapter/block/get.js b/test/utils/core-adapter/block/get.js index 37faec2..8fae479 100644 --- a/test/utils/core-adapter/block/get.js +++ b/test/utils/core-adapter/block/get.js @@ -2,9 +2,8 @@ const CID = require('cids') const Block = require('ipfs-block') -const callbackify = require('../../../../src/lib/callbackify') -module.exports = ipfsLite => callbackify(async (cid, options) => { +module.exports = ipfsLite => async (cid, options) => { const data = await ipfsLite.block.get(cid, options) return new Block(data, new CID(cid)) -}) +} diff --git a/test/utils/core-adapter/block/put.js b/test/utils/core-adapter/block/put.js index 3ab150d..9129374 100644 --- a/test/utils/core-adapter/block/put.js +++ b/test/utils/core-adapter/block/put.js @@ -3,9 +3,8 @@ const CID = require('cids') const Multihash = require('multihashes') const Block = require('ipfs-block') -const callbackify = require('../../../../src/lib/callbackify') -module.exports = ipfsLite => callbackify(async (block, options) => { +module.exports = ipfsLite => async (block, options) => { options = options || {} // Extract options from passed CID @@ -22,4 +21,4 @@ module.exports = ipfsLite => callbackify(async (block, options) => { const { key } = await ipfsLite.block.put(block, options) return new Block(block, new CID(key)) -}) +} diff --git a/test/utils/core-adapter/index.js b/test/utils/core-adapter/index.js index ffe2872..62bbd4c 100644 --- a/test/utils/core-adapter/index.js +++ b/test/utils/core-adapter/index.js @@ -1,22 +1,31 @@ 'use strict' +const callbackify = require('../../../src/lib/callbackify') + // TODO: extract as separate module module.exports = ipfsLite => { const adapter = { - add: require('./add')(ipfsLite), + add: callbackify(require('./add')(ipfsLite), { minArgs: 1 }), addPullStream: require('./add-pull-stream')(ipfsLite), - addFromURL: require('./add-from-url')(ipfsLite), + addFromURL: callbackify(require('./add-from-url')(ipfsLite), { minArgs: 1 }), bitswap: { - stat: require('./bitswap/stat')(ipfsLite), - wantlist: require('./bitswap/wantlist')(ipfsLite) + stat: callbackify(require('./bitswap/stat')(ipfsLite)), + wantlist: callbackify(require('./bitswap/wantlist')(ipfsLite)) }, block: { - get: require('./block/get')(ipfsLite), - put: require('./block/put')(ipfsLite), + get: callbackify(require('./block/get')(ipfsLite)), + put: callbackify(require('./block/put')(ipfsLite)), stat: ipfsLite.block.stat }, - ls: require('./ls')(ipfsLite), - lsPullStream: require('./ls-pull-stream')(ipfsLite) + ls: callbackify(require('./ls')(ipfsLite)), + lsPullStream: require('./ls-pull-stream')(ipfsLite), + pubsub: { + ls: ipfsLite.pubsub.ls, + peers: ipfsLite.pubsub.peers, + publish: ipfsLite.pubsub.publish, + subscribe: callbackify(require('./pubsub/subscribe')(ipfsLite), { minArgs: 2 }), + unsubscribe: callbackify(require('./pubsub/unsubscribe')(ipfsLite), { minArgs: 2 }) + } } return new Proxy(ipfsLite, { diff --git a/test/utils/core-adapter/ls/index.js b/test/utils/core-adapter/ls/index.js index ff10f3f..5636d64 100644 --- a/test/utils/core-adapter/ls/index.js +++ b/test/utils/core-adapter/ls/index.js @@ -1,9 +1,8 @@ 'use strict' -const callbackify = require('../../../../src/lib/callbackify') const linkTypeToString = require('./link-type-to-string') -module.exports = ipfsLite => callbackify(async (...args) => { +module.exports = ipfsLite => async (...args) => { const res = await ipfsLite.ls(...args) return res.map(({ name, hash, size, type }) => ({ depth: 1, @@ -13,4 +12,4 @@ module.exports = ipfsLite => callbackify(async (...args) => { size, type: linkTypeToString(type) })) -}) +} diff --git a/test/utils/core-adapter/pubsub/handler-map.js b/test/utils/core-adapter/pubsub/handler-map.js new file mode 100644 index 0000000..c08549c --- /dev/null +++ b/test/utils/core-adapter/pubsub/handler-map.js @@ -0,0 +1 @@ +module.exports = new Map() diff --git a/test/utils/core-adapter/pubsub/subscribe.js b/test/utils/core-adapter/pubsub/subscribe.js new file mode 100644 index 0000000..a6f90b0 --- /dev/null +++ b/test/utils/core-adapter/pubsub/subscribe.js @@ -0,0 +1,12 @@ +'use strict' + +const CID = require('cids') +const HandlerMap = require('./handler-map') + +module.exports = ipfsLite => (topic, handler, options) => { + HandlerMap.set(handler, (msg, fatal) => { + handler({ ...msg, from: new CID(msg.from).toString() }, fatal) + }) + + return ipfsLite.pubsub.subscribe(topic, HandlerMap.get(handler), options) +} diff --git a/test/utils/core-adapter/pubsub/unsubscribe.js b/test/utils/core-adapter/pubsub/unsubscribe.js new file mode 100644 index 0000000..586ba2f --- /dev/null +++ b/test/utils/core-adapter/pubsub/unsubscribe.js @@ -0,0 +1,9 @@ +'use strict' + +const HandlerMap = require('./handler-map') + +module.exports = ipfsLite => (topic, handler) => { + const adapterHandler = HandlerMap.get(handler) + HandlerMap.delete(handler) + return ipfsLite.pubsub.unsubscribe(topic, adapterHandler) +} From c9a43e637f05860a3fb2d151064866d32eefbc6a Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 8 Jul 2019 13:24:06 +0100 Subject: [PATCH 2/8] chore: appease linter --- src/lib/querystring.js | 2 ++ src/pubsub/subscribe.js | 1 + src/pubsub/unsubscribe.js | 1 + test/utils/core-adapter/pubsub/handler-map.js | 2 ++ 4 files changed, 6 insertions(+) diff --git a/src/lib/querystring.js b/src/lib/querystring.js index c074ad2..e0ffc9a 100644 --- a/src/lib/querystring.js +++ b/src/lib/querystring.js @@ -1,3 +1,5 @@ +'use strict' + const QueryString = require('querystring') // Convert an object to a query string INCLUDING leading ? diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index 765527e..4ea7658 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -28,6 +28,7 @@ module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { throw err } + // eslint-disable-next-line no-console const onError = options.onError || (err => console.error(err)) ;(async () => { diff --git a/src/pubsub/unsubscribe.js b/src/pubsub/unsubscribe.js index 3789109..3caef6b 100644 --- a/src/pubsub/unsubscribe.js +++ b/src/pubsub/unsubscribe.js @@ -5,5 +5,6 @@ const SubscriptionTracker = require('./subscription-tracker') module.exports = configure(({ fetch, apiUrl, apiPath, headers }) => { const subsTracker = SubscriptionTracker.singleton() + // eslint-disable-next-line require-await return async (topic, handler) => subsTracker.unsubscribe(topic, handler) }) diff --git a/test/utils/core-adapter/pubsub/handler-map.js b/test/utils/core-adapter/pubsub/handler-map.js index c08549c..f7e0c54 100644 --- a/test/utils/core-adapter/pubsub/handler-map.js +++ b/test/utils/core-adapter/pubsub/handler-map.js @@ -1 +1,3 @@ +'use strict' + module.exports = new Map() From 98a825fc94259fa89349e73b1121dc31c73f2ca8 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 8 Jul 2019 13:29:36 +0100 Subject: [PATCH 3/8] fix: remove .only --- README.md | 5 ++--- test/interface.spec.js | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 1e50e1b..f484d6c 100644 --- a/README.md +++ b/README.md @@ -102,12 +102,11 @@ Instead of a local installation (and bundling) you may request a remote copy of To always request the latest version, use the following: ```html - - - ``` +You can also use the un-minified version, just remove ".min" from the URL. + For maximum security you may also decide to: * Reference a specific version of the IPFS HTTP API client (to prevent unexpected breaking changes when a newer latest version is published) diff --git a/test/interface.spec.js b/test/interface.spec.js index f91b952..0d39900 100644 --- a/test/interface.spec.js +++ b/test/interface.spec.js @@ -350,8 +350,7 @@ describe('interface-ipfs-core tests', () => { name: 'should subscribe 10 handlers and unsunscribe once with no reference to the handlers', reason: 'LITE: Max 6 open XHR requests in the browser - https://stackoverflow.com/questions/561046/how-many-concurrent-ajax-xmlhttprequest-requests-are-allowed-in-popular-browse' } - ]), - only: true + ]) }) tests.repo(defaultCommon, { skip: { reason: 'LITE: not implemented yet' } }) From 895292607514f9d36591e83ad172f173ee4214b5 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 8 Jul 2019 15:19:29 +0100 Subject: [PATCH 4/8] fix: remove ua headre change - cannot in browsers --- src/lib/configure.browser.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/lib/configure.browser.js b/src/lib/configure.browser.js index c96a592..55fb9e4 100644 --- a/src/lib/configure.browser.js +++ b/src/lib/configure.browser.js @@ -11,10 +11,6 @@ module.exports = create => config => { config.apiPath = config.apiPath || '/api/v0' config.headers = new Headers(config.headers) - if (!config.headers.has('User-Agent')) { - config.headers.append('User-Agent', 'ipfs-http-client-lite/0.0.0') - } - return create(config) } From f9e71f755fe5595bba82cf7a464a52c69f6d8fcc Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 8 Jul 2019 16:29:44 +0100 Subject: [PATCH 5/8] feat: add pubsub example --- examples/pubsub.html | 170 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 examples/pubsub.html diff --git a/examples/pubsub.html b/examples/pubsub.html new file mode 100644 index 0000000..09ef64e --- /dev/null +++ b/examples/pubsub.html @@ -0,0 +1,170 @@ + + + + Pubsub in the browser + + + + +
+ + + +

Pubsub

+
+
+
API URL
+ + +
+
+
Connect to peer
+ + +
+
+
Subscribe to pubsub topic
+ + +
+
+
Send pubsub message
+ + +
+
+
Console
+
+
+
+ + + + + From 1302d4d6c68f9152c57e7fc2086fc29a226d267e Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 8 Jul 2019 22:16:35 +0100 Subject: [PATCH 6/8] feat: enter to submit --- examples/pubsub.html | 63 ++++++++++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/examples/pubsub.html b/examples/pubsub.html index 09ef64e..9439ee8 100644 --- a/examples/pubsub.html +++ b/examples/pubsub.html @@ -41,13 +41,12 @@

Pubsub

From c15c463f78b3e301be404675ec813cdda8b854b1 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 8 Jul 2019 22:26:23 +0100 Subject: [PATCH 7/8] fix: add timeout --- test/utils/core-adapter/pubsub/unsubscribe.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/utils/core-adapter/pubsub/unsubscribe.js b/test/utils/core-adapter/pubsub/unsubscribe.js index 586ba2f..8b8d2c7 100644 --- a/test/utils/core-adapter/pubsub/unsubscribe.js +++ b/test/utils/core-adapter/pubsub/unsubscribe.js @@ -2,8 +2,9 @@ const HandlerMap = require('./handler-map') -module.exports = ipfsLite => (topic, handler) => { +module.exports = ipfsLite => async (topic, handler) => { const adapterHandler = HandlerMap.get(handler) HandlerMap.delete(handler) - return ipfsLite.pubsub.unsubscribe(topic, adapterHandler) + await ipfsLite.pubsub.unsubscribe(topic, adapterHandler) + await new Promise(resolve => setTimeout(resolve, 100)) } From 3665b85e80cfa3072fc70b39976a749b47768f09 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 8 Jul 2019 22:47:52 +0100 Subject: [PATCH 8/8] fix: temporarily disable firefox testing --- .travis.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index 2f05df9..17e3e19 100644 --- a/.travis.yml +++ b/.travis.yml @@ -34,11 +34,5 @@ jobs: chrome: stable script: npx aegir test -t browser -t webworker - - stage: test - name: firefox - addons: - firefox: latest - script: npx aegir test -t browser -t webworker -- --browsers FirefoxHeadless - notifications: email: false