diff --git a/package.json b/package.json index 5fb97aa240..a3437153ea 100644 --- a/package.json +++ b/package.json @@ -93,7 +93,9 @@ "byteman": "^1.3.5", "cids": "~0.5.3", "debug": "^3.1.0", + "del": "^3.0.0", "err-code": "^1.1.2", + "fast-glob": "^2.2.2", "file-type": "^8.1.0", "filesize": "^3.6.1", "fnv1a": "^1.0.1", @@ -101,7 +103,6 @@ "get-folder-size": "^2.0.0", "glob": "^7.1.2", "hapi": "^16.6.2", - "hapi-set-header": "^1.0.2", "hoek": "^5.0.3", "human-to-milliseconds": "^1.0.0", "interface-datastore": "~0.4.2", @@ -167,9 +168,11 @@ "read-pkg-up": "^4.0.0", "readable-stream": "2.3.6", "receptacle": "^1.3.2", + "stream-concat": "^0.3.0", "stream-to-pull-stream": "^1.7.2", "tar-stream": "^1.6.1", "temp": "~0.8.3", + "tempy": "^0.2.1", "through2": "^2.0.3", "update-notifier": "^2.5.0", "yargs": "^12.0.1", diff --git a/src/core/components/files.js b/src/core/components/files.js index d575d893fa..6d7122e8a7 100644 --- a/src/core/components/files.js +++ b/src/core/components/files.js @@ -13,7 +13,7 @@ const deferred = require('pull-defer') const waterfall = require('async/waterfall') const isStream = require('is-stream') const isSource = require('is-pull-stream').isSource -const Duplex = require('readable-stream').Duplex +const Duplex = require('stream').Duplex const OtherBuffer = require('buffer').Buffer const CID = require('cids') const toB58String = require('multihashes').toB58String @@ -136,6 +136,7 @@ class AddHelper extends Duplex { if (end instanceof Error) { this.emit('error', end) } + this.push(null) } else { this.push(data) } diff --git a/src/http/api/resources/files.js b/src/http/api/resources/files.js index 3e2fb28d38..64d0a06afe 100644 --- a/src/http/api/resources/files.js +++ b/src/http/api/resources/files.js @@ -1,5 +1,7 @@ 'use strict' +const path = require('path') +const fs = require('fs') const mh = require('multihashes') const multipart = require('ipfs-multipart') const debug = require('debug') @@ -10,10 +12,21 @@ const pull = require('pull-stream') const toPull = require('stream-to-pull-stream') const pushable = require('pull-pushable') const each = require('async/each') +const content = require('content') const toStream = require('pull-stream-to-stream') const abortable = require('pull-abortable') const Joi = require('joi') +const pump = require('pump') +const tempy = require('tempy') const ndjson = require('pull-ndjson') +const { + parseChunkedInput, + createMultipartReply, + matchMultipartEnd, + processAndAdd +} = require('../utils/add-experimental') + +const filesDir = tempy.directory() exports = module.exports @@ -182,6 +195,7 @@ exports.add = { parser.on('file', (fileName, fileStream) => { fileName = decodeURIComponent(fileName) + const filePair = { path: fileName, content: toPull(fileStream) @@ -192,7 +206,6 @@ exports.add = { parser.on('directory', (directory) => { directory = decodeURIComponent(directory) - fileAdder.push({ path: directory, content: '' @@ -220,7 +233,7 @@ exports.add = { rawLeaves: request.query['raw-leaves'], progress: request.query.progress ? progressHandler : null, onlyHash: request.query['only-hash'], - hashAlg: request.query['hash'], + hashAlg: request.query.hash, wrapWithDirectory: request.query['wrap-with-directory'], pin: request.query.pin, chunker: request.query.chunker @@ -282,6 +295,78 @@ exports.add = { } } +exports.addExperimental = { + validate: { + query: Joi.object() + .keys({ + 'cid-version': Joi.number().integer().min(0).max(1).default(0), + 'raw-leaves': Joi.boolean(), + 'only-hash': Joi.boolean(), + pin: Joi.boolean().default(true), + 'wrap-with-directory': Joi.boolean(), + chunker: Joi.string() + }) + // TODO: Necessary until validate "recursive", "stream-channels" etc. + .options({ allowUnknown: true }), + headers: { + 'content-range': Joi.string().regex(/(\d+)-(\d+)\/(\d+|\*)/), + 'x-chunked-input': Joi.string().regex(/^uuid="([^"]+)";\s*index=(\d*)/i) + }, + options: { + allowUnknown: true + } + }, + + handler: (request, reply) => { + const chunkedInput = parseChunkedInput(request) + + // non chunked + if (!chunkedInput) { + createMultipartReply( + request.payload, + request, + reply, + (err) => { + if (err) { + return reply(err) + } + } + ) + + return + } + + // chunked + const [uuid, index] = chunkedInput + const [, , , total] = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/) + const file = path.join(filesDir, uuid) + '-' + index + + // TODO validate duplicates, missing chunks when resumeable and concurrent request are supported + + pump( + request.payload, + fs.createWriteStream(file), + (err) => { + if (err) { + return reply(err) + } + const boundary = content.type(request.headers['content-type']).boundary + matchMultipartEnd(file, boundary, (err, isEnd) => { + if (err) { + return reply(err) + } + + if (isEnd) { + processAndAdd(uuid, filesDir, request, reply) + } else { + reply({ Bytes: total }) + } + }) + } + ) + } +} + exports.immutableLs = { // uses common parseKey method that returns a `key` parseArgs: exports.parseKey, diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index fc8222f180..0dd95af105 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -1,7 +1,7 @@ 'use strict' -const resources = require('./../resources') const mfs = require('ipfs-mfs/http') +const resources = require('./../resources') module.exports = (server) => { const api = server.select('API') @@ -37,13 +37,28 @@ module.exports = (server) => { config: { payload: { parse: false, - output: 'stream' + output: 'stream', + maxBytes: 1000 * 1024 * 1024 }, handler: resources.files.add.handler, validate: resources.files.add.validate } }) + api.route({ + method: 'POST', + path: '/api/v0/add-experimental', + config: { + payload: { + parse: false, + output: 'stream', + maxBytes: 1000 * 1024 * 1024 + }, + validate: resources.files.addExperimental.validate, + handler: resources.files.addExperimental.handler + } + }) + api.route({ // TODO fix method method: '*', diff --git a/src/http/api/utils/add-experimental.js b/src/http/api/utils/add-experimental.js new file mode 100644 index 0000000000..53df513c67 --- /dev/null +++ b/src/http/api/utils/add-experimental.js @@ -0,0 +1,146 @@ +'use strict' + +const fs = require('fs') +const path = require('path') +const { EOL } = require('os') +const { Readable } = require('stream') +const glob = require('fast-glob') +const StreamConcat = require('stream-concat') +const del = require('del') +const content = require('content') +const { Parser } = require('ipfs-multipart') + +const processAndAdd = (uuid, filesDir, request, reply) => { + // all chunks have been received + // TODO : here we have full size we can calculate the number of chunks to validate we have all the bytes + const base = path.join(filesDir, uuid) + '-' + const pattern = base + '*' + const files = glob.sync(pattern) + + files.sort((a, b) => { + return Number(a.replace(base, '')) - Number(b.replace(base, '')) + }) + + let fileIndex = 0 + const nextStream = () => fileIndex === files.length + ? null + : fs.createReadStream(files[fileIndex++]) + + createMultipartReply( + new StreamConcat(nextStream), + request, + reply, + (err) => { + if (err) { + return reply(err) + } + del(pattern, { force: true }) + .then(paths => { + console.log('Deleted files and folders:\n', paths.join('\n')) + }) + .catch(console.error) + } + ) +} + +const matchMultipartEnd = (file, boundary, cb) => { + const buffer = Buffer.alloc(56) + const fs = require('fs') + fs.open(file, 'r', (err, fd) => { + if (err) { + cb(err) + } + + fs.fstat(fd, (err, stats) => { + if (err) { + cb(err) + } + + fs.read(fd, buffer, 0, buffer.length, stats.size - 58, function (e, l, b) { + cb(null, b.toString().includes(boundary)) + }) + fs.close(fd) + }) + }) +} + +const parseChunkedInput = (request) => { + const input = request.headers['x-chunked-input'] + const regex = /^uuid="([^"]+)";\s*index=(\d*)/i + + if (!input) { + return null + } + const match = input.match(regex) + + return [match[1], Number(match[2])] +} + +const createMultipartReply = (readStream, request, reply, cb) => { + const boundary = content.type(request.headers['content-type']).boundary + const ipfs = request.server.app.ipfs + const query = request.query + const parser = new Parser({ boundary: boundary }) + const replyStream = new Readable({ read: () => {} }) + const serialize = d => JSON.stringify(d) + EOL + const progressHandler = (bytes) => { + replyStream.push(serialize({ Bytes: bytes })) + } + // ipfs add options + const options = { + cidVersion: query['cid-version'], + rawLeaves: query['raw-leaves'], + progress: query.progress ? progressHandler : null, + onlyHash: query['only-hash'], + hashAlg: query.hash, + wrapWithDirectory: query['wrap-with-directory'], + pin: query.pin, + chunker: query.chunker + } + const addStream = ipfs.files.addReadableStream(options) + + // Setup add pipeline + addStream.on('data', file => { + replyStream.push(serialize({ + Name: file.path, + Hash: file.hash, + Size: file.size + })) + }) + addStream.on('end', () => replyStream.push(null)) + addStream.on('error', cb) + + // Setup multipart parser + parser.on('file', (fileName, fileStream) => { + addStream.write({ + path: decodeURIComponent(fileName), + content: fileStream + }) + }) + parser.on('directory', (directory) => { + addStream.write({ + path: decodeURIComponent(directory), + content: '' + }) + }) + parser.on('end', () => { + addStream.end() + }) + parser.on('error', cb) + + // Send replyStream to reply + reply(replyStream) + .header('x-chunked-output', '1') + .header('content-encoding', 'identity') // stop gzip from buffering, see https://github.com/hapijs/hapi/issues/2975 + .header('content-type', 'application/json') + + // start piping data to multipart parser + readStream.pipe(parser) +} + +module.exports = { + processAndAdd, + matchMultipartEnd, + parseChunkedInput, + createMultipartReply +} diff --git a/src/http/index.js b/src/http/index.js index ed1edce219..706365e717 100644 --- a/src/http/index.js +++ b/src/http/index.js @@ -4,7 +4,6 @@ const series = require('async/series') const Hapi = require('hapi') const debug = require('debug') const multiaddr = require('multiaddr') -const setHeader = require('hapi-set-header') const once = require('once') const IPFS = require('../core') @@ -103,7 +102,11 @@ function HttpApi (repo, config, cliArgs) { this.server = new Hapi.Server({ connections: { routes: { - cors: true + cors: { + origin: ['*'], + additionalHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length', 'Content-Type', 'Content-Range', 'X-Chunked-Input'], + additionalExposedHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length', 'X-Chunked-Input'] + } } }, debug: process.env.DEBUG ? { @@ -137,14 +140,6 @@ function HttpApi (repo, config, cliArgs) { // load gateway routes require('./gateway/routes')(this.server) - // Set default headers - setHeader(this.server, - 'Access-Control-Allow-Headers', - 'X-Stream-Output, X-Chunked-Output, X-Content-Length') - setHeader(this.server, - 'Access-Control-Expose-Headers', - 'X-Stream-Output, X-Chunked-Output, X-Content-Length') - this.server.start(cb) }) },