From 19e5d479b90ee8340f099e4d93b7f0c3aee74da9 Mon Sep 17 00:00:00 2001 From: Calvin Metcalf Date: Tue, 16 May 2017 11:08:49 -0400 Subject: [PATCH 1/7] streams: add flow and buffer properties to streams This adds computed properties to readable and writable streams to allow access to the readable buffer, the writable buffer, and flow state without accessing the readable or writable state. An underscored method is also added to allow internal function to set the flow state. These are the only uses of readable and writable state in the docs so adding these work arounds allows them to be removed from the docs. Part of the readableState/writableState mega issue #445. --- lib/_stream_duplex.js | 10 +++++++ lib/_stream_readable.js | 26 +++++++++++++++++++ lib/_stream_writable.js | 6 +++++ test/parallel/test-stream-push-order.js | 2 +- ...est-stream-readable-reading-readingMore.js | 2 +- test/parallel/test-stream2-transform.js | 2 +- test/parallel/test-stream2-unpipe-leak.js | 2 +- test/parallel/test-stream3-pause-then-read.js | 2 +- 8 files changed, 47 insertions(+), 5 deletions(-) diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index 05f649340845b0..e99d246396f6cd 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -74,6 +74,16 @@ Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', { } }); +Object.defineProperty(Duplex.prototype, 'writableBuffer', { + // making it explicit this property is not enumerable + // because otherwise some prototype manipulation in + // userland will fail + enumerable: false, + get: function() { + return this._writableState && this._writableState.getBuffer(); + } +}); + // the no-half-open enforcer function onend() { // if we allow half-open state, or if the writable side ended, diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 452b7321b2c480..cbed8058c3af11 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -926,6 +926,32 @@ Object.defineProperty(Readable.prototype, 'readableHighWaterMark', { } }); +Object.defineProperty(Readable.prototype, 'readableBuffer', { + // making it explicit this property is not enumerable + // because otherwise some prototype manipulation in + // userland will fail + enumerable: false, + get: function() { + return this._readableState && this._readableState.buffer; + } +}); + +Object.defineProperty(Readable.prototype, 'readableFlowing', { + // making it explicit this property is not enumerable + // because otherwise some prototype manipulation in + // userland will fail + enumerable: false, + get: function() { + return this._readableState.flowing; + } +}); + +Readable.prototype._setFlowing = function(state) { + if (this._readableState) { + this._readableState.flowing = state; + } +}; + // exposed for testing purposes only. Readable._fromList = fromList; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 1735fafb5f34d3..ab719a6b1ea122 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -325,6 +325,12 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { return this; }; +Object.defineProperty(Writable.prototype, 'writableBuffer', { + get: function() { + return this._writableState && this._writableState.getBuffer(); + } +}); + function decodeChunk(state, chunk, encoding) { if (!state.objectMode && state.decodeStrings !== false && diff --git a/test/parallel/test-stream-push-order.js b/test/parallel/test-stream-push-order.js index be2db9f44a6691..ce4f336b0254d5 100644 --- a/test/parallel/test-stream-push-order.js +++ b/test/parallel/test-stream-push-order.js @@ -47,6 +47,6 @@ s.read(0); // ACTUALLY [1, 3, 5, 6, 4, 2] process.on('exit', function() { - assert.deepStrictEqual(s._readableState.buffer.join(','), '1,2,3,4,5,6'); + assert.deepStrictEqual(s.readableBuffer.join(','), '1,2,3,4,5,6'); console.log('ok'); }); diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js index bee3a1c82a8678..e31d2dd921ce5b 100644 --- a/test/parallel/test-stream-readable-reading-readingMore.js +++ b/test/parallel/test-stream-readable-reading-readingMore.js @@ -15,7 +15,7 @@ assert.strictEqual(state.readingMore, false); readable.on('data', common.mustCall((data) => { // while in a flowing state, should try to read more. - if (state.flowing) + if (readable.readableFlowing) assert.strictEqual(state.readingMore, true); // reading as long as we've not ended diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index 0f12476f506a93..1a58ad4948daca 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -46,7 +46,7 @@ const Transform = require('_stream_transform'); assert.strictEqual(tx._readableState.length, 10); assert.strictEqual(transformed, 10); assert.strictEqual(tx._transformState.writechunk.length, 5); - assert.deepStrictEqual(tx._writableState.getBuffer().map(function(c) { + assert.deepStrictEqual(tx.writableBuffer.map(function(c) { return c.chunk.length; }), [6, 7, 8, 9, 10]); } diff --git a/test/parallel/test-stream2-unpipe-leak.js b/test/parallel/test-stream2-unpipe-leak.js index cc331d58217e25..5c19be061fd54c 100644 --- a/test/parallel/test-stream2-unpipe-leak.js +++ b/test/parallel/test-stream2-unpipe-leak.js @@ -66,7 +66,7 @@ assert.strictEqual(dest.listeners('finish').length, 0); console.error(src._readableState); process.on('exit', function() { - src._readableState.buffer.length = 0; + src.readableBuffer.length = 0; console.error(src._readableState); assert(src._readableState.length >= src.readableHighWaterMark); console.log('ok'); diff --git a/test/parallel/test-stream3-pause-then-read.js b/test/parallel/test-stream3-pause-then-read.js index d75fe697081b32..f7bfadaf9d124c 100644 --- a/test/parallel/test-stream3-pause-then-read.js +++ b/test/parallel/test-stream3-pause-then-read.js @@ -68,7 +68,7 @@ function readn(n, then) { r.once('readable', read); else { assert.strictEqual(c.length, n); - assert(!r._readableState.flowing); + assert(!r.readableFlowing); then(); } })(); From 5d5ea319df75c6fbfa3b57e5fe5d724e20bf8d57 Mon Sep 17 00:00:00 2001 From: Calvin Metcalf Date: Tue, 16 May 2017 11:09:31 -0400 Subject: [PATCH 2/7] docs: remove readable and writable state This changes the docs to use the new readableBuffer, writableBuffer, and flow properties instead of manipulating the readable and writable state. Part of the readableState/writableState mega issue #445. --- doc/api/stream.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 25a218cf130728..880a9231514b76 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -63,8 +63,8 @@ object mode is not safe. Both [Writable][] and [Readable][] streams will store data in an internal -buffer that can be retrieved using `writable._writableState.getBuffer()` or -`readable._readableState.buffer`, respectively. +buffer that can be retrieved using `writable.writableBuffer` or +`readable.readableBuffer`, respectively. The amount of data potentially buffered depends on the `highWaterMark` option passed into the streams constructor. For normal streams, the `highWaterMark` @@ -602,22 +602,22 @@ Readable stream implementation. Specifically, at any given point in time, every Readable is in one of three possible states: -* `readable._readableState.flowing = null` -* `readable._readableState.flowing = false` -* `readable._readableState.flowing = true` +* `readable.readableFlowing = null` +* `readable.readableFlowing = false` +* `readable.readableFlowing = true` -When `readable._readableState.flowing` is `null`, no mechanism for consuming the +When `readable.readableFlowing` is `null`, no mechanism for consuming the streams data is provided so the stream will not generate its data. While in this state, attaching a listener for the `'data'` event, calling the `readable.pipe()` method, or calling the `readable.resume()` method will switch -`readable._readableState.flowing` to `true`, causing the Readable to begin +`readable.readableFlowing` to `true`, causing the Readable to begin actively emitting events as data is generated. Calling `readable.pause()`, `readable.unpipe()`, or receiving "back pressure" -will cause the `readable._readableState.flowing` to be set as `false`, +will cause the `readable.readableFlowing` to be set as `false`, temporarily halting the flowing of events but *not* halting the generation of data. While in this state, attaching a listener for the `'data'` event -would not cause `readable._readableState.flowing` to switch to `true`. +would not cause `readable.flowing` to switch to `true`. ```js const { PassThrough, Writable } = require('stream'); @@ -626,14 +626,14 @@ const writable = new Writable(); pass.pipe(writable); pass.unpipe(writable); -// flowing is now false +// readableFlowing is now false pass.on('data', (chunk) => { console.log(chunk.toString()); }); pass.write('ok'); // will not emit 'data' pass.resume(); // must be called to make 'data' being emitted ``` -While `readable._readableState.flowing` is `false`, data may be accumulating +While `readable.readabelFlowing` is `false`, data may be accumulating within the streams internal buffer. #### Choose One From d9743754a47cf32f1bd305586d4fd802b154679a Mon Sep 17 00:00:00 2001 From: Calvin Metcalf Date: Tue, 16 May 2017 11:10:28 -0400 Subject: [PATCH 3/7] http: use new internal method to set stream flow This updates http_client and http_server to use the new _setFlowing method to set the flowing state instead of directly manipulating readable state. Part of the readableState/writableState mega issue #445. --- benchmark/http/upgrade.js | 53 +++++++++++++++++++++++++++++++++++++++ lib/_http_client.js | 4 +-- lib/_http_server.js | 4 +-- 3 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 benchmark/http/upgrade.js diff --git a/benchmark/http/upgrade.js b/benchmark/http/upgrade.js new file mode 100644 index 00000000000000..1926d7f4c20d3e --- /dev/null +++ b/benchmark/http/upgrade.js @@ -0,0 +1,53 @@ +'use strict'; + +const common = require('../common.js'); +const PORT = common.PORT; +const net = require('net'); + +const bench = common.createBenchmark(main, { + n: [5, 1000] +}); + +const reqData = 'GET / HTTP/1.1\r\n' + + 'Upgrade: WebSocket\r\n' + + 'Connection: Upgrade\r\n' + + '\r\n' + + 'WjN}|M(6'; + +const resData = 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' + + 'Upgrade: WebSocket\r\n' + + 'Connection: Upgrade\r\n' + + '\r\n\r\n'; + +function main(conf) { + process.env.PORT = PORT; + var server = require('../fixtures/simple-http-server.js') + .listen(process.env.PORT || common.PORT) + .on('listening', function() { + bench.start(); + doBench(server.address(), +conf.n, function() { + bench.end(+conf.n); + server.close(); + }); + }) + .on('upgrade', function(req, socket, upgradeHead) { + socket.resume(); + socket.write(resData); + socket.end(); + }); +} + +function doBench(address, count, done) { + if (count === 0) { + done(); + return; + } + + const conn = net.createConnection(address.port); + conn.write(reqData); + conn.resume(); + + conn.on('end', function() { + doBench(address, count - 1, done); + }); +} diff --git a/lib/_http_client.js b/lib/_http_client.js index 366a7b0712879c..9b6773533c050b 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -464,9 +464,7 @@ function socketOnData(d) { socket.removeListener('close', socketCloseListener); socket.removeListener('error', socketErrorListener); - // TODO(isaacs): Need a way to reset a stream to fresh state - // IE, not flowing, and not explicitly paused. - socket._readableState.flowing = null; + socket._setFlowing(null); req.emit(eventName, res, socket, bodyHead); req.emit('close'); diff --git a/lib/_http_server.js b/lib/_http_server.js index 7315a266b890f0..c0d99a51affabe 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -502,9 +502,7 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) { debug('SERVER have listener for %s', eventName); var bodyHead = d.slice(bytesParsed, d.length); - // TODO(isaacs): Need a way to reset a stream to fresh state - // IE, not flowing, and not explicitly paused. - socket._readableState.flowing = null; + socket._setFlowing(null); server.emit(eventName, req, socket, bodyHead); } else { // Got upgrade header or CONNECT method, but have no handler. From b3c8f34ca520e18f8d9dcd20bdaac2374136bf91 Mon Sep 17 00:00:00 2001 From: Calvin Metcalf Date: Tue, 16 May 2017 11:10:48 -0400 Subject: [PATCH 4/7] net: avoid 2 usages of internal stream state This uses the new internal method _setFlowing to avoid a manipulation of readable state and uses writableBuffer instead of the getBuffer method of the writable state. Part of the readableState/writableState mega issue #445 --- lib/net.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/net.js b/lib/net.js index 93b2279c7b167e..24089a2f5353fd 100644 --- a/lib/net.js +++ b/lib/net.js @@ -244,7 +244,7 @@ function Socket(options) { // stop the handle from reading and pause the stream this._handle.reading = false; this._handle.readStop(); - this._readableState.flowing = false; + this._setFlowing(false); } else if (!options.manualStart) { this.read(0); } @@ -829,7 +829,7 @@ protoGetter('bytesWritten', function bytesWritten() { if (!state) return undefined; - state.getBuffer().forEach(function(el) { + this.writableBuffer.forEach(function(el) { if (el.chunk instanceof Buffer) bytes += el.chunk.length; else From d002892e82cb77abaef69d4f0cd9d2bba676a56a Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 16 Nov 2017 15:59:27 +0100 Subject: [PATCH 5/7] squash: address doc nits --- doc/api/stream.md | 4 ++-- lib/_stream_writable.js | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 880a9231514b76..e655401671ae7d 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -617,7 +617,7 @@ Calling `readable.pause()`, `readable.unpipe()`, or receiving "back pressure" will cause the `readable.readableFlowing` to be set as `false`, temporarily halting the flowing of events but *not* halting the generation of data. While in this state, attaching a listener for the `'data'` event -would not cause `readable.flowing` to switch to `true`. +would not cause `readable.readableFlowing` to switch to `true`. ```js const { PassThrough, Writable } = require('stream'); @@ -633,7 +633,7 @@ pass.write('ok'); // will not emit 'data' pass.resume(); // must be called to make 'data' being emitted ``` -While `readable.readabelFlowing` is `false`, data may be accumulating +While `readable.readableFlowing` is `false`, data may be accumulating within the streams internal buffer. #### Choose One diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index ab719a6b1ea122..850e4bf5bbe354 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -326,6 +326,10 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { }; Object.defineProperty(Writable.prototype, 'writableBuffer', { + // making it explicit this property is not enumerable + // because otherwise some prototype manipulation in + // userland will fail + enumerable: false, get: function() { return this._writableState && this._writableState.getBuffer(); } From 589910a8a7d2f09fc3cc783ca7f3a4ae34201e78 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 11 Dec 2017 13:27:03 +0100 Subject: [PATCH 6/7] squash: renamed _setFlowing to readableFlowing setter --- lib/_http_client.js | 2 +- lib/_http_server.js | 2 +- lib/_stream_readable.js | 11 +++++------ lib/net.js | 2 +- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index 9b6773533c050b..4f9ab525d29793 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -464,7 +464,7 @@ function socketOnData(d) { socket.removeListener('close', socketCloseListener); socket.removeListener('error', socketErrorListener); - socket._setFlowing(null); + socket.readableFlowing = null; req.emit(eventName, res, socket, bodyHead); req.emit('close'); diff --git a/lib/_http_server.js b/lib/_http_server.js index c0d99a51affabe..e90fa55b030ddc 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -502,7 +502,7 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) { debug('SERVER have listener for %s', eventName); var bodyHead = d.slice(bytesParsed, d.length); - socket._setFlowing(null); + socket.readableFlowing = null; server.emit(eventName, req, socket, bodyHead); } else { // Got upgrade header or CONNECT method, but have no handler. diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index cbed8058c3af11..b1841c709366a9 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -943,15 +943,14 @@ Object.defineProperty(Readable.prototype, 'readableFlowing', { enumerable: false, get: function() { return this._readableState.flowing; + }, + set: function(state) { + if (this._readableState) { + this._readableState.flowing = state; + } } }); -Readable.prototype._setFlowing = function(state) { - if (this._readableState) { - this._readableState.flowing = state; - } -}; - // exposed for testing purposes only. Readable._fromList = fromList; diff --git a/lib/net.js b/lib/net.js index 24089a2f5353fd..0cd95c28168ae7 100644 --- a/lib/net.js +++ b/lib/net.js @@ -244,7 +244,7 @@ function Socket(options) { // stop the handle from reading and pause the stream this._handle.reading = false; this._handle.readStop(); - this._setFlowing(false); + this.readableFlowing = false; } else if (!options.manualStart) { this.read(0); } From 1be51534266edc79d839009a77a35310249a5cc0 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 11 Dec 2017 13:28:58 +0100 Subject: [PATCH 7/7] squash: address nit on benchmark --- benchmark/http/upgrade.js | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/benchmark/http/upgrade.js b/benchmark/http/upgrade.js index 1926d7f4c20d3e..0feaecc8ff19e6 100644 --- a/benchmark/http/upgrade.js +++ b/benchmark/http/upgrade.js @@ -19,22 +19,22 @@ const resData = 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' + 'Connection: Upgrade\r\n' + '\r\n\r\n'; -function main(conf) { +function main({ n }) { process.env.PORT = PORT; var server = require('../fixtures/simple-http-server.js') - .listen(process.env.PORT || common.PORT) - .on('listening', function() { - bench.start(); - doBench(server.address(), +conf.n, function() { - bench.end(+conf.n); - server.close(); + .listen(common.PORT) + .on('listening', function() { + bench.start(); + doBench(server.address(), n, function() { + bench.end(n); + server.close(); + }); + }) + .on('upgrade', function(req, socket, upgradeHead) { + socket.resume(); + socket.write(resData); + socket.end(); }); - }) - .on('upgrade', function(req, socket, upgradeHead) { - socket.resume(); - socket.write(resData); - socket.end(); - }); } function doBench(address, count, done) {