From 007aae1510003af3f396ed10a5f857fbc7a8ed46 Mon Sep 17 00:00:00 2001 From: Blaine Bublitz Date: Thu, 5 Nov 2015 16:57:19 -0600 Subject: [PATCH] Fix: Sink the saveStream on nextTick to start flowing and avoid highWaterMark --- lib/dest/index.js | 10 +++++++ lib/sink.js | 16 ++++++++++ package.json | 2 ++ test/dest.js | 76 +++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 101 insertions(+), 3 deletions(-) create mode 100644 lib/sink.js diff --git a/lib/dest/index.js b/lib/dest/index.js index ecc6ad7b..81b5417a 100644 --- a/lib/dest/index.js +++ b/lib/dest/index.js @@ -3,6 +3,7 @@ var through2 = require('through2'); var sourcemaps = require('gulp-sourcemaps'); var duplexify = require('duplexify'); +var sink = require('../sink'); var prepareWrite = require('../prepareWrite'); var writeContents = require('./writeContents'); @@ -21,7 +22,12 @@ function dest(outFolder, opt) { } var saveStream = through2.obj(saveFile); + if (!opt.sourcemaps) { + // Sink the save stream to start flowing + // Do this on nextTick, it will flow at slowest speed of piped streams + process.nextTick(sink(saveStream)); + return saveStream; } @@ -34,6 +40,10 @@ function dest(outFolder, opt) { var outputStream = duplexify.obj(mapStream, saveStream); mapStream.pipe(saveStream); + // Sink the output stream to start flowing + // Do this on nextTick, it will flow at slowest speed of piped streams + process.nextTick(sink(outputStream)); + return outputStream; } diff --git a/lib/sink.js b/lib/sink.js new file mode 100644 index 00000000..6a3f4641 --- /dev/null +++ b/lib/sink.js @@ -0,0 +1,16 @@ +'use strict'; + +var Writable = require('readable-stream/writable'); + +function sink(stream) { + var sinkStream = new Writable({ + objectMode: true, + write: function(file, enc, cb) { cb(); } + }); + + return function() { + stream.pipe(sinkStream); + }; +} + +module.exports = sink; diff --git a/package.json b/package.json index 4be1a480..369e79aa 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "merge-stream": "^1.0.0", "mkdirp": "^0.5.0", "object-assign": "^4.0.0", + "readable-stream": "^2.0.4", "strip-bom": "^2.0.0", "strip-bom-stream": "^1.0.0", "through2": "^2.0.0", @@ -27,6 +28,7 @@ }, "devDependencies": { "buffer-equal": "^0.0.1", + "del": "^2.2.0", "istanbul": "^0.3.0", "istanbul-coveralls": "^1.0.1", "jshint": "^2.4.1", diff --git a/test/dest.js b/test/dest.js index d094ec04..ce12ba32 100644 --- a/test/dest.js +++ b/test/dest.js @@ -8,7 +8,8 @@ var vfs = require('../'); var path = require('path'); var fs = require('graceful-fs'); -var rimraf = require('rimraf'); +var del = require('del'); +var Writeable = require('readable-stream/writable'); var bufEqual = require('buffer-equal'); var through = require('through2'); @@ -17,11 +18,12 @@ var File = require('vinyl'); var should = require('should'); require('mocha'); -var wipeOut = function(cb) { - rimraf(path.join(__dirname, './out-fixtures/'), cb); +var wipeOut = function() { spies.setError('false'); statSpy.reset(); chmodSpy.reset(); + del.sync(path.join(__dirname, './fixtures/highwatermark')); + del.sync(path.join(__dirname, './out-fixtures/')); }; var dataWrap = function(fn) { @@ -1256,4 +1258,72 @@ describe('dest stream', function() { stream.write(file); stream.end(); }); + + it('does not get clogged by highWaterMark', function(done) { + fs.mkdirSync(path.join(__dirname, './fixtures/highwatermark')); + for (var idx = 0; idx < 17; idx++) { + fs.writeFileSync(path.join(__dirname, './fixtures/highwatermark/', 'file' + idx + '.txt')); + } + + var srcPath = path.join(__dirname, './fixtures/highwatermark/*.txt'); + var srcStream = vfs.src(srcPath); + var destStream = vfs.dest('./out-fixtures/', {cwd: __dirname}); + + var fileCount = 0; + var countFiles = through.obj(function(file, enc, cb) { + fileCount++; + + cb(null, file); + }); + + destStream.once('finish', function() { + fileCount.should.equal(17); + done(); + }); + + srcStream.pipe(countFiles).pipe(destStream); + }); + + it('allows backpressure when piped to another, slower stream', function(done) { + this.timeout(20000); + + fs.mkdirSync(path.join(__dirname, './fixtures/highwatermark')); + for (var idx = 0; idx < 24; idx++) { + fs.writeFileSync(path.join(__dirname, './fixtures/highwatermark/', 'file' + idx + '.txt')); + } + + var srcPath = path.join(__dirname, './fixtures/highwatermark/*.txt'); + var srcStream = vfs.src(srcPath); + var destStream = vfs.dest('./out-fixtures/', {cwd: __dirname}); + + var fileCount = 0; + var countFiles = through.obj(function(file, enc, cb) { + fileCount++; + + cb(null, file); + }); + + var slowFileCount = 0; + var slowCountFiles = new Writeable({ + objectMode: true, + write: function(file, enc, cb){ + slowFileCount++; + + setTimeout(function() { + cb(null, file); + }, 250); + } + }); + + slowCountFiles.once('finish', function() { + fileCount.should.equal(24); + slowFileCount.should.equal(24); + done(); + }); + + srcStream + .pipe(countFiles) + .pipe(destStream) + .pipe(slowCountFiles); + }); });