Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

[WIP] feat: support chunked add requests #1540

Closed
wants to merge 10 commits into from
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,16 @@
"byteman": "^1.3.5",
"cids": "~0.5.3",
"debug": "^3.1.0",
"del": "^3.0.0",
"err-code": "^1.1.2",
"fast-glob": "^2.2.2",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use glob or replace glob with fast-glob?

"file-type": "^8.1.0",
"filesize": "^3.6.1",
"fnv1a": "^1.0.1",
"fsm-event": "^2.1.0",
"get-folder-size": "^2.0.0",
"glob": "^7.1.2",
"hapi": "^16.6.2",
"hapi-set-header": "^1.0.2",
"hoek": "^5.0.3",
"human-to-milliseconds": "^1.0.0",
"interface-datastore": "~0.4.2",
Expand Down Expand Up @@ -167,9 +168,11 @@
"read-pkg-up": "^4.0.0",
"readable-stream": "2.3.6",
"receptacle": "^1.3.2",
"stream-concat": "^0.3.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use ~ for pre 1.0 dependencies

"stream-to-pull-stream": "^1.7.2",
"tar-stream": "^1.6.1",
"temp": "~0.8.3",
"tempy": "^0.2.1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use temp?

"through2": "^2.0.3",
"update-notifier": "^2.5.0",
"yargs": "^12.0.1",
Expand Down
3 changes: 2 additions & 1 deletion src/core/components/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const deferred = require('pull-defer')
const waterfall = require('async/waterfall')
const isStream = require('is-stream')
const isSource = require('is-pull-stream').isSource
const Duplex = require('readable-stream').Duplex
const Duplex = require('stream').Duplex
const OtherBuffer = require('buffer').Buffer
const CID = require('cids')
const toB58String = require('multihashes').toB58String
Expand Down Expand Up @@ -136,6 +136,7 @@ class AddHelper extends Duplex {
if (end instanceof Error) {
this.emit('error', end)
}
this.push(null)
} else {
this.push(data)
}
Expand Down
89 changes: 87 additions & 2 deletions src/http/api/resources/files.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict'

const path = require('path')
const fs = require('fs')
const mh = require('multihashes')
const multipart = require('ipfs-multipart')
const debug = require('debug')
Expand All @@ -10,10 +12,21 @@ const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const pushable = require('pull-pushable')
const each = require('async/each')
const content = require('content')
const toStream = require('pull-stream-to-stream')
const abortable = require('pull-abortable')
const Joi = require('joi')
const pump = require('pump')
const tempy = require('tempy')
const ndjson = require('pull-ndjson')
const {
parseChunkedInput,
createMultipartReply,
matchMultipartEnd,
processAndAdd
} = require('../utils/add-experimental')

const filesDir = tempy.directory()

exports = module.exports

Expand Down Expand Up @@ -182,6 +195,7 @@ exports.add = {

parser.on('file', (fileName, fileStream) => {
fileName = decodeURIComponent(fileName)

const filePair = {
path: fileName,
content: toPull(fileStream)
Expand All @@ -192,7 +206,6 @@ exports.add = {

parser.on('directory', (directory) => {
directory = decodeURIComponent(directory)

fileAdder.push({
path: directory,
content: ''
Expand Down Expand Up @@ -220,7 +233,7 @@ exports.add = {
rawLeaves: request.query['raw-leaves'],
progress: request.query.progress ? progressHandler : null,
onlyHash: request.query['only-hash'],
hashAlg: request.query['hash'],
hashAlg: request.query.hash,
wrapWithDirectory: request.query['wrap-with-directory'],
pin: request.query.pin,
chunker: request.query.chunker
Expand Down Expand Up @@ -282,6 +295,78 @@ exports.add = {
}
}

exports.addExperimental = {
validate: {
query: Joi.object()
.keys({
'cid-version': Joi.number().integer().min(0).max(1).default(0),
'raw-leaves': Joi.boolean(),
'only-hash': Joi.boolean(),
pin: Joi.boolean().default(true),
'wrap-with-directory': Joi.boolean(),
chunker: Joi.string()
})
// TODO: Necessary until validate "recursive", "stream-channels" etc.
.options({ allowUnknown: true }),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please reuse from exports.add.validate.query?

headers: {
'content-range': Joi.string().regex(/(\d+)-(\d+)\/(\d+|\*)/),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.required()?

'x-chunked-input': Joi.string().regex(/^uuid="([^"]+)";\s*index=(\d*)/i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.required()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to validate the UUID much better here as it is concatenated with the temporary directory path and we don't want directory traversal attacks.

},
options: {
allowUnknown: true
}
},

handler: (request, reply) => {
const chunkedInput = parseChunkedInput(request)

// non chunked
if (!chunkedInput) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safe to remove this - we won't get here if the headers are being validated using Joi.

createMultipartReply(
request.payload,
request,
reply,
(err) => {
if (err) {
return reply(err)
}
}
)

return
}

// chunked
const [uuid, index] = chunkedInput
const [, , , total] = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: parseContentRange(header) function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseInt on total?

const file = path.join(filesDir, uuid) + '-' + index

// TODO validate duplicates, missing chunks when resumeable and concurrent request are supported

pump(
request.payload,
fs.createWriteStream(file),
(err) => {
if (err) {
return reply(err)
}
const boundary = content.type(request.headers['content-type']).boundary
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a try/catch as it can throw.

matchMultipartEnd(file, boundary, (err, isEnd) => {
if (err) {
return reply(err)
}

if (isEnd) {
processAndAdd(uuid, filesDir, request, reply)
} else {
reply({ Bytes: total })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For every chunk we return the total bytes expected to be uploaded?

}
})
}
)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a clean up job for failed requests.

}

exports.immutableLs = {
// uses common parseKey method that returns a `key`
parseArgs: exports.parseKey,
Expand Down
19 changes: 17 additions & 2 deletions src/http/api/routes/files.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const resources = require('./../resources')
const mfs = require('ipfs-mfs/http')
const resources = require('./../resources')

module.exports = (server) => {
const api = server.select('API')
Expand Down Expand Up @@ -37,13 +37,28 @@ module.exports = (server) => {
config: {
payload: {
parse: false,
output: 'stream'
output: 'stream',
maxBytes: 1000 * 1024 * 1024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for adding this restriction to /api/v0/add? Is there a restriction in go-ipfs?

},
handler: resources.files.add.handler,
validate: resources.files.add.validate
}
})

api.route({
method: 'POST',
path: '/api/v0/add-experimental',
config: {
payload: {
parse: false,
output: 'stream',
maxBytes: 1000 * 1024 * 1024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • How do we pick what is the default max chunk size? Eg. why 1GB and not 256MB ?
  • Is this going to be a hardcoded limit, or a configuration option that can be passed to js-ipfs constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we pick what is the default max chunk size? Eg. why 1GB and not 256MB ?

we need this value to be high for the non chunked path

Is this going to be a hardcoded limit, or a configuration option that can be passed to js-ipfs constructor?

don't know. what you think ?

from the hapi documentation when output=stream, maxBytes doesn't matter but this doesnt seem to be the case. Needs further investigation, maybe it's a matter of upgrading hapi dunno

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of anything right now.
Unless you see good use case for customizing it, lets go with hardcoded value for now.
Exposing it as a config property can be contributed later in separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be the maximum size we're prepared to accept for /api/v0/add.

},
validate: resources.files.addExperimental.validate,
handler: resources.files.addExperimental.handler
}
})

api.route({
// TODO fix method
method: '*',
Expand Down
146 changes: 146 additions & 0 deletions src/http/api/utils/add-experimental.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
'use strict'

const fs = require('fs')
const path = require('path')
const { EOL } = require('os')
const { Readable } = require('stream')
const glob = require('fast-glob')
const StreamConcat = require('stream-concat')
const del = require('del')
const content = require('content')
const { Parser } = require('ipfs-multipart')

const processAndAdd = (uuid, filesDir, request, reply) => {
// all chunks have been received
// TODO : here we have full size we can calculate the number of chunks to validate we have all the bytes
const base = path.join(filesDir, uuid) + '-'
const pattern = base + '*'
const files = glob.sync(pattern)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async please :)


files.sort((a, b) => {
return Number(a.replace(base, '')) - Number(b.replace(base, ''))
})

let fileIndex = 0
const nextStream = () => fileIndex === files.length
? null
: fs.createReadStream(files[fileIndex++])

createMultipartReply(
new StreamConcat(nextStream),
request,
reply,
(err) => {
if (err) {
return reply(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reply will be called twice if there's an error mid stream. Does Hapi somehow deal with this? In createMultipartReply there's error listeners that call callback but they don't do anything to end the replyStream. Does this leave the connection open?

I think we need a test to ensure the correct thing is being done.

}
del(pattern, { force: true })
.then(paths => {
console.log('Deleted files and folders:\n', paths.join('\n'))
})
.catch(console.error)
}
)
}

const matchMultipartEnd = (file, boundary, cb) => {
const buffer = Buffer.alloc(56)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the magic numbers here 56 & 58? Would you mind adding comments to explain or refactor to be more obvious?

const fs = require('fs')
fs.open(file, 'r', (err, fd) => {
if (err) {
cb(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing return

}

fs.fstat(fd, (err, stats) => {
if (err) {
cb(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing return

}

fs.read(fd, buffer, 0, buffer.length, stats.size - 58, function (e, l, b) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need a guard here incase position is negative?

cb(null, b.toString().includes(boundary))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please handle error and use more descriptive variable names!

})
fs.close(fd)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move close to read callback?

})
})
}

const parseChunkedInput = (request) => {
const input = request.headers['x-chunked-input']
const regex = /^uuid="([^"]+)";\s*index=(\d*)/i

if (!input) {
return null
}
const match = input.match(regex)

return [match[1], Number(match[2])]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better to just return an object here with uuid and index property names so it is obvious what they are.

}

const createMultipartReply = (readStream, request, reply, cb) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this should just construct and return a stream that can be passed to reply in the handler.

const boundary = content.type(request.headers['content-type']).boundary
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs try/catch

const ipfs = request.server.app.ipfs
const query = request.query
const parser = new Parser({ boundary: boundary })
const replyStream = new Readable({ read: () => {} })
const serialize = d => JSON.stringify(d) + EOL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the EOL needs to be OS specific here does it?

const progressHandler = (bytes) => {
replyStream.push(serialize({ Bytes: bytes }))
}
// ipfs add options
const options = {
cidVersion: query['cid-version'],
rawLeaves: query['raw-leaves'],
progress: query.progress ? progressHandler : null,
onlyHash: query['only-hash'],
hashAlg: query.hash,
wrapWithDirectory: query['wrap-with-directory'],
pin: query.pin,
chunker: query.chunker
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should extract this logic into a function e.g. queryToAddOptions in http/api/resouces/files.js and use it in add and addExperimental and just pass the result of calling it to this function. It'll save us having to keep track of another place where these options are constructed.

const addStream = ipfs.files.addReadableStream(options)

// Setup add pipeline
addStream.on('data', file => {
replyStream.push(serialize({
Name: file.path,
Hash: file.hash,
Size: file.size
}))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be a transform, we should just be able to pipe readStream -> parser -> addStream -> replyStream

})
addStream.on('end', () => replyStream.push(null))
addStream.on('error', cb)

// Setup multipart parser
parser.on('file', (fileName, fileStream) => {
addStream.write({
path: decodeURIComponent(fileName),
content: fileStream
})
})
parser.on('directory', (directory) => {
addStream.write({
path: decodeURIComponent(directory),
content: ''
})
})
parser.on('end', () => {
addStream.end()
})
parser.on('error', cb)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be an abstraction around this that allows us to just pipe it to our add stream.


// Send replyStream to reply
reply(replyStream)
.header('x-chunked-output', '1')
.header('content-encoding', 'identity') // stop gzip from buffering, see https://github.com/hapijs/hapi/issues/2975
.header('content-type', 'application/json')

// start piping data to multipart parser
readStream.pipe(parser)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like callback is called when the reply ends and so temporary files don't get cleaned up. Would you mind adding a test to verify?


module.exports = {
processAndAdd,
matchMultipartEnd,
parseChunkedInput,
createMultipartReply
}
Loading