diff --git a/src/exporter.js b/src/exporter.js new file mode 100644 index 00000000..779a2c5b --- /dev/null +++ b/src/exporter.js @@ -0,0 +1,124 @@ +'use strict' + +const debug = require('debug') +const log = debug('exporter') +log.err = debug('exporter:error') +const UnixFS = require('ipfs-unixfs') +const async = require('async') +const events = require('events') +const Readable = require('stream').Readable +const pathj = require('path') + +exports = module.exports = exporter + +function exporter (hash, dagService, options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + + const ee = new events.EventEmitter() + dagService.get(hash, (err, fetchedNode) => { + if (err) { + if (callback) { + return callback(err) + } + return + } + const data = UnixFS.unmarshal(fetchedNode.data) + const type = data.type + if (type === 'directory') { + dirExporter(fetchedNode, hash, callback) + } + if (type === 'file') { + fileExporter(fetchedNode, hash, false, callback) + } + }) + return ee + + function fileExporter (node, name, dir, callback) { + if (typeof dir === 'function') { callback = dir; dir = {} } + var rs = new Readable() + if (node.links.length === 0) { + const unmarshaledData = UnixFS.unmarshal(node.data) + ee.emit('file', { stream: rs, path: name, dir: dir }) + rs.push(unmarshaledData.data) + rs.push(null) + if (callback) { + callback() + } + return + } else { + ee.emit('file', { stream: rs, path: name, dir: dir }) + var init = false + rs._read = () => { + if (init) { + return + } + init = true + async.forEachSeries(node.links, (link, callback) => { + dagService.get(link.hash, (err, res) => { + if (err) { + callback(err) + } + var unmarshaledData = UnixFS.unmarshal(res.data) + rs.push(unmarshaledData.data) + callback() + }) + }, (err) => { + if (err) { + if (callback) { + return callback(err) + } + return + } + rs.push(null) + if (callback) { + callback() + } + return + }) + } + } + } + + function dirExporter (node, name, callback) { + var rs = new Readable() + if (node.links.length === 0) { + rs.push(node.data) + rs.push(null) + ee.emit('file', {stream: rs, path: name}) + if (callback) { + callback() + } + return + } else { + async.forEachSeries(node.links, (link, callback) => { + dagService.get(link.hash, (err, res) => { + if (err) { + callback(err) + } + var unmarshaledData = UnixFS.unmarshal(res.data) + if (unmarshaledData.type === 'file') { + return (fileExporter(res, pathj.join(name, link.name), callback)) + } + if (unmarshaledData.type === 'directory') { + return (dirExporter(res, pathj.join(name, link.name), callback)) + } + callback() + }) + }, (err) => { + if (err) { + if (callback) { + return callback(err) + } + return + } + if (callback) { + callback() + } + return + }) + } + } +} diff --git a/src/importer.js b/src/importer.js new file mode 100644 index 00000000..722fd33a --- /dev/null +++ b/src/importer.js @@ -0,0 +1,249 @@ +'use strict' + +const debug = require('debug') +const log = debug('importer') +log.err = debug('importer:error') +const fs = require('fs') +const mDAG = require('ipfs-merkle-dag') +const FixedSizeChunker = require('./chunker-fixed-size') +const through2 = require('through2') +const UnixFS = require('ipfs-unixfs') +const async = require('async') + +exports = module.exports + +const CHUNK_SIZE = 262144 + +// Use a layout + chunkers to convert a directory (or file) to the layout format +exports.import = (target, dagService, options, callback) => { + if (typeof options === 'function') { callback = options; options = {} } + + if (!target) { return callback(new Error('must specify target')) } + if (!dagService) { return callback(new Error('must specify dag service')) } + + // options.recursive : follow dirs + // options.chunkers : obj with chunkers to each type of data, { default: dumb-chunker } + + options = options || {} + + if (Buffer.isBuffer(target)) { + bufferImporter(target, callback) + } else if (typeof target.on === 'function') { + // TODO Create Stream Importer + // streamImporter(options.stream, callback) + return callback(new Error('stream importer has not been built yet')) + } else if (typeof target === 'string') { + var stats + try { + stats = fs.statSync(target) + } catch (e) { + return callback(e) + } + if (stats.isFile()) { + fileImporter(target, callback) + } else if (stats.isDirectory() && options.recursive) { + dirImporter(target, callback) + } else { + return callback(new Error('recursive must be true to add a directory')) + } + } + + function fileImporter (path, callback) { + const stats = fs.statSync(path) + if (stats.size > CHUNK_SIZE) { + const links = [] // { Hash: , Size: , Name: } + fs.createReadStream(path) + .pipe(new FixedSizeChunker(CHUNK_SIZE)) + .pipe(through2((chunk, enc, cb) => { + // TODO: check if this is right (I believe it should be type 'raw' + // https://github.com/ipfs/go-ipfs/issues/2331 + const raw = new UnixFS('file', chunk) + + const node = new mDAG.DAGNode(raw.marshal()) + + dagService.add(node, function (err) { + if (err) { + return log.err(err) + } + links.push({ + Hash: node.multihash(), + Size: node.size(), + leafSize: raw.fileSize(), + Name: '' + }) + cb() + }) + }, (cb) => { + const file = new UnixFS('file') + const parentNode = new mDAG.DAGNode() + links.forEach((l) => { + file.addBlockSize(l.leafSize) + const link = new mDAG.DAGLink(l.Name, l.Size, l.Hash) + parentNode.addRawLink(link) + }) + + parentNode.data = file.marshal() + dagService.add(parentNode, (err) => { + if (err) { + return log.err(err) + } + + const pathSplit = path.split('/') + const fileName = pathSplit[pathSplit.length - 1] + + callback(null, { + Hash: parentNode.multihash(), + Size: parentNode.size(), + Name: fileName + }) && cb() + }) + })) + } else { + // create just one file node with the data directly + var buf = fs.readFileSync(path) + const fileUnixFS = new UnixFS('file', buf) + const fileNode = new mDAG.DAGNode(fileUnixFS.marshal()) + + dagService.add(fileNode, (err) => { + if (err) { + return log.err(err) + } + + const split = path.split('/') + const fileName = split[split.length - 1] + + callback(null, { + Hash: fileNode.multihash(), + Size: fileNode.size(), + Name: fileName + }) + }) + } + } + + function dirImporter (path, callback) { + const files = fs.readdirSync(path) + const dirUnixFS = new UnixFS('directory') + const dirNode = new mDAG.DAGNode() + + if (files.length === 0) { + dirNode.data = dirUnixFS.marshal() + dagService.add(dirNode, (err) => { + if (err) { + return callback(err) + } + + const split = path.split('/') + const dirName = split[split.length - 1] + + callback(null, { + Hash: dirNode.multihash(), + Size: dirNode.size(), + Name: dirName + }) + }) + return + } + + async.map( + files, + (file, cb) => { + const filePath = path + '/' + file + const stats = fs.statSync(filePath) + if (stats.isFile()) { + return fileImporter(filePath, cb) + } if (stats.isDirectory()) { + return dirImporter(filePath, cb) + } else { + return cb(new Error('Found a weird file' + path + file)) + } + }, + (err, results) => { + if (err) { + return callback(err) + } + results.forEach((result) => { + dirNode.addRawLink(new mDAG.DAGLink(result.Name, result.Size, result.Hash)) + }) + + dirNode.data = dirUnixFS.marshal() + + dagService.add(dirNode, (err) => { + if (err) { + return callback(err) + } + + const split = path.split('/') + const dirName = split[split.length - 1] + + callback(null, { + Hash: dirNode.multihash(), + Size: dirNode.size(), + Name: dirName + }) + }) + }) + } + function bufferImporter (buffer, callback) { + const links = [] // { Hash: , Size: , Name: } + if (buffer.length > CHUNK_SIZE) { + var fsc = new FixedSizeChunker(CHUNK_SIZE) + fsc.write(buffer) + fsc.end() + fsc.pipe(through2((chunk, enc, cb) => { + // TODO: check if this is right (I believe it should be type 'raw' + // https://github.com/ipfs/go-ipfs/issues/2331 + const raw = new UnixFS('file', chunk) + const node = new mDAG.DAGNode(raw.marshal()) + + dagService.add(node, function (err) { + if (err) { + return log.err(err) + } + links.push({ + Hash: node.multihash(), + Size: node.size(), + leafSize: raw.fileSize(), + Name: '' + }) + cb() + }) + }, (cb) => { + const file = new UnixFS('file') + const parentNode = new mDAG.DAGNode() + links.forEach((l) => { + file.addBlockSize(l.leafSize) + const link = new mDAG.DAGLink(l.Name, l.Size, l.Hash) + parentNode.addRawLink(link) + }) + parentNode.data = file.marshal() + dagService.add(parentNode, (err) => { + if (err) { + return log.err(err) + } + + callback(null, { + Hash: parentNode.multihash(), + Size: parentNode.size() + }) && cb() + }) + })) + } else { + // create just one file node with the data directly + const fileUnixFS = new UnixFS('file', buffer) + const fileNode = new mDAG.DAGNode(fileUnixFS.marshal()) + + dagService.add(fileNode, (err) => { + if (err) { + return log.err(err) + } + + callback(null, { + Hash: fileNode.multihash(), + Size: fileNode.size() + }) + }) + } + } + // function streamImporter (stream, callback) {} +} diff --git a/src/index.js b/src/index.js index cd60790f..2072ba0b 100644 --- a/src/index.js +++ b/src/index.js @@ -1,364 +1,2 @@ -'use strict' - -const debug = require('debug') -const log = debug('importer') -log.err = debug('importer:error') -const fs = require('fs') -const mDAG = require('ipfs-merkle-dag') -const FixedSizeChunker = require('./chunker-fixed-size') -const through2 = require('through2') -const UnixFS = require('ipfs-unixfs') -const async = require('async') -const events = require('events') -const Readable = require('stream').Readable -const pathj = require('path') - -exports = module.exports - -const CHUNK_SIZE = 262144 - -// Use a layout + chunkers to convert a directory (or file) to the layout format -exports.import = (target, dagService, options, callback) => { - if (typeof options === 'function') { callback = options; options = {} } - - if (!target) { return callback(new Error('must specify target')) } - if (!dagService) { return callback(new Error('must specify dag service')) } - - // options.recursive : follow dirs - // options.chunkers : obj with chunkers to each type of data, { default: dumb-chunker } - - options = options || {} - - if (Buffer.isBuffer(target)) { - bufferImporter(target, callback) - } else if (typeof target.on === 'function') { - // TODO Create Stream Importer - // streamImporter(options.stream, callback) - return callback(new Error('stream importer has not been built yet')) - } else if (typeof target === 'string') { - var stats - try { - stats = fs.statSync(target) - } catch (e) { - return callback(e) - } - if (stats.isFile()) { - fileImporter(target, callback) - } else if (stats.isDirectory() && options.recursive) { - dirImporter(target, callback) - } else { - return callback(new Error('recursive must be true to add a directory')) - } - } - - function fileImporter (path, callback) { - const stats = fs.statSync(path) - if (stats.size > CHUNK_SIZE) { - const links = [] // { Hash: , Size: , Name: } - fs.createReadStream(path) - .pipe(new FixedSizeChunker(CHUNK_SIZE)) - .pipe(through2((chunk, enc, cb) => { - // TODO: check if this is right (I believe it should be type 'raw' - // https://github.com/ipfs/go-ipfs/issues/2331 - const raw = new UnixFS('file', chunk) - - const node = new mDAG.DAGNode(raw.marshal()) - - dagService.add(node, function (err) { - if (err) { - return log.err(err) - } - links.push({ - Hash: node.multihash(), - Size: node.size(), - leafSize: raw.fileSize(), - Name: '' - }) - cb() - }) - }, (cb) => { - const file = new UnixFS('file') - const parentNode = new mDAG.DAGNode() - links.forEach((l) => { - file.addBlockSize(l.leafSize) - const link = new mDAG.DAGLink(l.Name, l.Size, l.Hash) - parentNode.addRawLink(link) - }) - - parentNode.data = file.marshal() - dagService.add(parentNode, (err) => { - if (err) { - return log.err(err) - } - - const pathSplit = path.split('/') - const fileName = pathSplit[pathSplit.length - 1] - - callback(null, { - Hash: parentNode.multihash(), - Size: parentNode.size(), - Name: fileName - }) && cb() - }) - })) - } else { - // create just one file node with the data directly - var buf = fs.readFileSync(path) - const fileUnixFS = new UnixFS('file', buf) - const fileNode = new mDAG.DAGNode(fileUnixFS.marshal()) - - dagService.add(fileNode, (err) => { - if (err) { - return log.err(err) - } - - const split = path.split('/') - const fileName = split[split.length - 1] - - callback(null, { - Hash: fileNode.multihash(), - Size: fileNode.size(), - Name: fileName - }) - }) - } - } - - function dirImporter (path, callback) { - const files = fs.readdirSync(path) - const dirUnixFS = new UnixFS('directory') - const dirNode = new mDAG.DAGNode() - - if (files.length === 0) { - dirNode.data = dirUnixFS.marshal() - dagService.add(dirNode, (err) => { - if (err) { - return callback(err) - } - - const split = path.split('/') - const dirName = split[split.length - 1] - - callback(null, { - Hash: dirNode.multihash(), - Size: dirNode.size(), - Name: dirName - }) - }) - return - } - - async.map( - files, - (file, cb) => { - const filePath = path + '/' + file - const stats = fs.statSync(filePath) - if (stats.isFile()) { - return fileImporter(filePath, cb) - } if (stats.isDirectory()) { - return dirImporter(filePath, cb) - } else { - return cb(new Error('Found a weird file' + path + file)) - } - }, - (err, results) => { - if (err) { - return callback(err) - } - results.forEach((result) => { - dirNode.addRawLink(new mDAG.DAGLink(result.Name, result.Size, result.Hash)) - }) - - dirNode.data = dirUnixFS.marshal() - - dagService.add(dirNode, (err) => { - if (err) { - return callback(err) - } - - const split = path.split('/') - const dirName = split[split.length - 1] - - callback(null, { - Hash: dirNode.multihash(), - Size: dirNode.size(), - Name: dirName - }) - }) - }) - } - function bufferImporter (buffer, callback) { - const links = [] // { Hash: , Size: , Name: } - if (buffer.length > CHUNK_SIZE) { - var fsc = new FixedSizeChunker(CHUNK_SIZE) - fsc.write(buffer) - fsc.end() - fsc.pipe(through2((chunk, enc, cb) => { - // TODO: check if this is right (I believe it should be type 'raw' - // https://github.com/ipfs/go-ipfs/issues/2331 - const raw = new UnixFS('file', chunk) - const node = new mDAG.DAGNode(raw.marshal()) - - dagService.add(node, function (err) { - if (err) { - return log.err(err) - } - links.push({ - Hash: node.multihash(), - Size: node.size(), - leafSize: raw.fileSize(), - Name: '' - }) - cb() - }) - }, (cb) => { - const file = new UnixFS('file') - const parentNode = new mDAG.DAGNode() - links.forEach((l) => { - file.addBlockSize(l.leafSize) - const link = new mDAG.DAGLink(l.Name, l.Size, l.Hash) - parentNode.addRawLink(link) - }) - parentNode.data = file.marshal() - dagService.add(parentNode, (err) => { - if (err) { - return log.err(err) - } - - callback(null, { - Hash: parentNode.multihash(), - Size: parentNode.size() - }) && cb() - }) - })) - } else { - // create just one file node with the data directly - const fileUnixFS = new UnixFS('file', buffer) - const fileNode = new mDAG.DAGNode(fileUnixFS.marshal()) - - dagService.add(fileNode, (err) => { - if (err) { - return log.err(err) - } - - callback(null, { - Hash: fileNode.multihash(), - Size: fileNode.size() - }) - }) - } - } - // function streamImporter (stream, callback) {} -} - -exports.export = function (hash, dagService, options, callback) { - if (typeof options === 'function') { - callback = options - options = {} - } - - const ee = new events.EventEmitter() - dagService.get(hash, (err, fetchedNode) => { - if (err) { - if (callback) { - return callback(err) - } - return - } - const data = UnixFS.unmarshal(fetchedNode.data) - const type = data.type - if (type === 'directory') { - dirExporter(fetchedNode, hash, callback) - } - if (type === 'file') { - fileExporter(fetchedNode, hash, false, callback) - } - }) - return ee - - function fileExporter (node, name, dir, callback) { - if (typeof dir === 'function') { callback = dir; dir = {} } - var rs = new Readable() - if (node.links.length === 0) { - const unmarshaledData = UnixFS.unmarshal(node.data) - ee.emit('file', { stream: rs, path: name, dir: dir }) - rs.push(unmarshaledData.data) - rs.push(null) - if (callback) { - callback() - } - return - } else { - ee.emit('file', { stream: rs, path: name, dir: dir }) - var init = false - rs._read = () => { - if (init) { - return - } - init = true - async.forEachSeries(node.links, (link, callback) => { - dagService.get(link.hash, (err, res) => { - if (err) { - callback(err) - } - var unmarshaledData = UnixFS.unmarshal(res.data) - rs.push(unmarshaledData.data) - callback() - }) - }, (err) => { - if (err) { - if (callback) { - return callback(err) - } - return - } - rs.push(null) - if (callback) { - callback() - } - return - }) - } - } - } - - function dirExporter (node, name, callback) { - var rs = new Readable() - if (node.links.length === 0) { - rs.push(node.data) - rs.push(null) - ee.emit('file', {stream: rs, path: name}) - if (callback) { - callback() - } - return - } else { - async.forEachSeries(node.links, (link, callback) => { - dagService.get(link.hash, (err, res) => { - if (err) { - callback(err) - } - var unmarshaledData = UnixFS.unmarshal(res.data) - if (unmarshaledData.type === 'file') { - return (fileExporter(res, pathj.join(name, link.name), callback)) - } - if (unmarshaledData.type === 'directory') { - return (dirExporter(res, pathj.join(name, link.name), callback)) - } - callback() - }) - }, (err) => { - if (err) { - if (callback) { - return callback(err) - } - return - } - if (callback) { - callback() - } - return - }) - } - } -} +exports.importer = require('./importer.js') +exports.exporter = require('./exporter.js') diff --git a/test/buffer-test.js b/test/buffer-test.js index 4f7e02e1..b057ff4d 100644 --- a/test/buffer-test.js +++ b/test/buffer-test.js @@ -1,7 +1,9 @@ /* eslint-env mocha */ 'use strict' -const importer = require('./../src') +const unixFSEngine = require('./../src') +const importer = unixFSEngine.importer +const exporter = unixFSEngine.exporter const BlockService = require('ipfs-blocks').BlockService const DAGService = require('ipfs-merkle-dag').DAGService const DAGNode = require('ipfs-merkle-dag').DAGNode @@ -87,7 +89,7 @@ module.exports = function (repo) { const hash = 'QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8' const bs = new BlockService(repo) const ds = new DAGService(bs) - const testExport = importer.export(hash, ds) + const testExport = exporter(hash, ds) testExport.on('file', (data) => { ds.get(hash, (err, fetchedNode) => { expect(err).to.not.exist @@ -102,7 +104,7 @@ module.exports = function (repo) { const hash = 'QmW7BDxEbGqxxSYVtn3peNPQgdDXbWkoQ6J1EFYAEuQV3Q' const bs = new BlockService(repo) const ds = new DAGService(bs) - const testExport = importer.export(hash, ds) + const testExport = exporter(hash, ds) testExport.on('file', (data) => { expect(data.stream).to.exist done() @@ -113,7 +115,7 @@ module.exports = function (repo) { const hash = 'QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE' const bs = new BlockService(repo) const ds = new DAGService(bs) - const testExport = importer.export(hash, ds) + const testExport = exporter(hash, ds) testExport.on('file', (data) => { expect(data.stream).to.exist done() @@ -124,7 +126,7 @@ module.exports = function (repo) { const hash = 'QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN' const bs = new BlockService(repo) const ds = new DAGService(bs) - const testExport = importer.export(hash, ds) + const testExport = exporter(hash, ds) var fs = [] testExport.on('file', (data) => { fs.push(data) diff --git a/test/node.js b/test/node.js index 4fa6eb66..94d254b6 100644 --- a/test/node.js +++ b/test/node.js @@ -1,4 +1,4 @@ -/* globals describe, before, after */ +/* eslint-env mocha */ 'use strict' diff --git a/test/test-import.js b/test/test-unixfs-engine.js similarity index 97% rename from test/test-import.js rename to test/test-unixfs-engine.js index d98d2e16..4d563e9e 100644 --- a/test/test-import.js +++ b/test/test-unixfs-engine.js @@ -1,7 +1,9 @@ /* eslint-env mocha */ 'use strict' -const importer = require('./../src') +const unixFSEngine = require('./../src') +const importer = unixFSEngine.importer +const exporter = unixFSEngine.exporter const expect = require('chai').expect const IPFSRepo = require('ipfs-repo') const BlockService = require('ipfs-blocks').BlockService @@ -269,7 +271,7 @@ describe('layout: importer', function () { describe('layout: exporter', function () { it('export a file with no links', (done) => { const hash = 'QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8' - const testExport = importer.export(hash, ds) + const testExport = exporter(hash, ds) testExport.on('file', (data) => { ds.get(hash, (err, fetchedNode) => { expect(err).to.not.exist @@ -282,7 +284,7 @@ describe('layout: exporter', function () { it('export a small file with links', (done) => { const hash = 'QmW7BDxEbGqxxSYVtn3peNPQgdDXbWkoQ6J1EFYAEuQV3Q' - const testExport = importer.export(hash, ds) + const testExport = exporter(hash, ds) testExport.on('file', (data) => { var ws = fs.createWriteStream(path.join(process.cwd(), '/test', data.path)) data.stream.pipe(ws) @@ -297,7 +299,7 @@ describe('layout: exporter', function () { it('export a large file > 5mb', (done) => { const hash = 'QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE' - const testExport = importer.export(hash, ds) + const testExport = exporter(hash, ds) testExport.on('file', (data) => { var ws = fs.createWriteStream(path.join(process.cwd(), '/test', data.path)) data.stream.pipe(ws) @@ -312,7 +314,7 @@ describe('layout: exporter', function () { it('export a directory', (done) => { const hash = 'QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN' - var testExport = importer.export(hash, ds) + var testExport = exporter(hash, ds) var fs = [] var x = 0 testExport.on('file', (data) => {