diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index eaba8ebfa8320d..10a0cf57e7789e 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -1,223 +1,3 @@ 'use strict'; -const assert = require('assert'); -const util = require('util'); -const { Socket } = require('net'); -const { JSStream } = process.binding('js_stream'); -const uv = process.binding('uv'); -const debug = util.debuglog('stream_wrap'); -const errors = require('internal/errors'); - -function StreamWrap(stream) { - const handle = new JSStream(); - - this.stream = stream; - - this._list = null; - - const self = this; - handle.close = function(cb) { - debug('close'); - self.doClose(cb); - }; - handle.isAlive = function() { - return self.isAlive(); - }; - handle.isClosing = function() { - return self.isClosing(); - }; - handle.onreadstart = function() { - return self.readStart(); - }; - handle.onreadstop = function() { - return self.readStop(); - }; - handle.onshutdown = function(req) { - return self.doShutdown(req); - }; - handle.onwrite = function(req, bufs) { - return self.doWrite(req, bufs); - }; - - this.stream.pause(); - this.stream.on('error', function onerror(err) { - self.emit('error', err); - }); - this.stream.on('data', function ondata(chunk) { - if (typeof chunk === 'string' || this._readableState.objectMode === true) { - // Make sure that no further `data` events will happen - this.pause(); - this.removeListener('data', ondata); - - self.emit('error', new errors.Error('ERR_STREAM_WRAP')); - return; - } - - debug('data', chunk.length); - if (self._handle) - self._handle.readBuffer(chunk); - }); - this.stream.once('end', function onend() { - debug('end'); - if (self._handle) - self._handle.emitEOF(); - }); - - Socket.call(this, { - handle: handle - }); -} -util.inherits(StreamWrap, Socket); -module.exports = StreamWrap; - -// require('_stream_wrap').StreamWrap -StreamWrap.StreamWrap = StreamWrap; - -StreamWrap.prototype.isAlive = function isAlive() { - return true; -}; - -StreamWrap.prototype.isClosing = function isClosing() { - return !this.readable || !this.writable; -}; - -StreamWrap.prototype.readStart = function readStart() { - this.stream.resume(); - return 0; -}; - -StreamWrap.prototype.readStop = function readStop() { - this.stream.pause(); - return 0; -}; - -StreamWrap.prototype.doShutdown = function doShutdown(req) { - const self = this; - const handle = this._handle; - const item = this._enqueue('shutdown', req); - - this.stream.end(function() { - // Ensure that write was dispatched - setImmediate(function() { - if (!self._dequeue(item)) - return; - - handle.finishShutdown(req, 0); - }); - }); - return 0; -}; - -StreamWrap.prototype.doWrite = function doWrite(req, bufs) { - const self = this; - const handle = self._handle; - - var pending = bufs.length; - - // Queue the request to be able to cancel it - const item = self._enqueue('write', req); - - self.stream.cork(); - for (var n = 0; n < bufs.length; n++) - self.stream.write(bufs[n], done); - self.stream.uncork(); - - function done(err) { - if (!err && --pending !== 0) - return; - - // Ensure that this is called once in case of error - pending = 0; - - let errCode = 0; - if (err) { - const code = uv[`UV_${err.code}`]; - errCode = (err.code && code) ? code : uv.UV_EPIPE; - } - - // Ensure that write was dispatched - setImmediate(function() { - // Do not invoke callback twice - if (!self._dequeue(item)) - return; - - handle.doAfterWrite(req); - handle.finishWrite(req, errCode); - }); - } - - return 0; -}; - -function QueueItem(type, req) { - this.type = type; - this.req = req; - this.prev = this; - this.next = this; -} - -StreamWrap.prototype._enqueue = function _enqueue(type, req) { - const item = new QueueItem(type, req); - if (this._list === null) { - this._list = item; - return item; - } - - item.next = this._list.next; - item.prev = this._list; - item.next.prev = item; - item.prev.next = item; - - return item; -}; - -StreamWrap.prototype._dequeue = function _dequeue(item) { - assert(item instanceof QueueItem); - - var next = item.next; - var prev = item.prev; - - if (next === null && prev === null) - return false; - - item.next = null; - item.prev = null; - - if (next === item) { - prev = null; - next = null; - } else { - prev.next = next; - next.prev = prev; - } - - if (this._list === item) - this._list = next; - - return true; -}; - -StreamWrap.prototype.doClose = function doClose(cb) { - const self = this; - const handle = self._handle; - - setImmediate(function() { - while (self._list !== null) { - const item = self._list; - const req = item.req; - self._dequeue(item); - - const errCode = uv.UV_ECANCELED; - if (item.type === 'write') { - handle.doAfterWrite(req); - handle.finishWrite(req, errCode); - } else if (item.type === 'shutdown') { - handle.finishShutdown(req, errCode); - } - } - - // Should be already set by net.js - assert(self._handle === null); - cb(); - }); -}; +module.exports = require('internal/wrap_js_stream'); diff --git a/lib/internal/wrap_js_stream.js b/lib/internal/wrap_js_stream.js new file mode 100644 index 00000000000000..73196fd0a2baee --- /dev/null +++ b/lib/internal/wrap_js_stream.js @@ -0,0 +1,220 @@ +'use strict'; + +const assert = require('assert'); +const util = require('util'); +const { Socket } = require('net'); +const { JSStream } = process.binding('js_stream'); +const uv = process.binding('uv'); +const debug = util.debuglog('stream_wrap'); +const errors = require('internal/errors'); + +/* This class serves as a wrapper for when the C++ side of Node wants access + * to a standard JS stream. For example, TLS or HTTP do not operate on network + * resources conceptually, although that is the common case and what we are + * optimizing for; in theory, they are completely composable and can work with + * any stream resource they see. + * + * For the common case, i.e. a TLS socket wrapping around a net.Socket, we + * can skip going through the JS layer and let TLS access the raw C++ handle + * of a net.Socket. The flipside of this is that, to maintain composability, + * we need a way to create "fake" net.Socket instances that call back into a + * "real" JavaScript stream. JSStreamWrap is exactly this. + */ +class JSStreamWrap extends Socket { + constructor(stream) { + const handle = new JSStream(); + handle.close = (cb) => { + debug('close'); + this.doClose(cb); + }; + handle.isAlive = () => this.isAlive(); + handle.isClosing = () => this.isClosing(); + handle.onreadstart = () => this.readStart(); + handle.onreadstop = () => this.readStop(); + handle.onshutdown = (req) => this.doShutdown(req); + handle.onwrite = (req, bufs) => this.doWrite(req, bufs); + + stream.pause(); + stream.on('error', (err) => this.emit('error', err)); + const ondata = (chunk) => { + if (typeof chunk === 'string' || + stream._readableState.objectMode === true) { + // Make sure that no further `data` events will happen. + stream.pause(); + stream.removeListener('data', ondata); + + this.emit('error', new errors.Error('ERR_STREAM_WRAP')); + return; + } + + debug('data', chunk.length); + if (this._handle) + this._handle.readBuffer(chunk); + }; + stream.on('data', ondata); + stream.once('end', () => { + debug('end'); + if (this._handle) + this._handle.emitEOF(); + }); + + super({ handle, manualStart: true }); + this.stream = stream; + this._list = null; + this.read(0); + } + + // Legacy + static get StreamWrap() { + return JSStreamWrap; + } + + isAlive() { + return true; + } + + isClosing() { + return !this.readable || !this.writable; + } + + readStart() { + this.stream.resume(); + return 0; + } + + readStop() { + this.stream.pause(); + return 0; + } + + doShutdown(req) { + const handle = this._handle; + const item = this._enqueue('shutdown', req); + + this.stream.end(() => { + // Ensure that write was dispatched + setImmediate(() => { + if (!this._dequeue(item)) + return; + + handle.finishShutdown(req, 0); + }); + }); + return 0; + } + + doWrite(req, bufs) { + const self = this; + const handle = this._handle; + + var pending = bufs.length; + + // Queue the request to be able to cancel it + const item = this._enqueue('write', req); + + this.stream.cork(); + for (var n = 0; n < bufs.length; n++) + this.stream.write(bufs[n], done); + this.stream.uncork(); + + function done(err) { + if (!err && --pending !== 0) + return; + + // Ensure that this is called once in case of error + pending = 0; + + let errCode = 0; + if (err) { + const code = uv[`UV_${err.code}`]; + errCode = (err.code && code) ? code : uv.UV_EPIPE; + } + + // Ensure that write was dispatched + setImmediate(function() { + // Do not invoke callback twice + if (!self._dequeue(item)) + return; + + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); + }); + } + + return 0; + } + + _enqueue(type, req) { + const item = new QueueItem(type, req); + if (this._list === null) { + this._list = item; + return item; + } + + item.next = this._list.next; + item.prev = this._list; + item.next.prev = item; + item.prev.next = item; + + return item; + } + + _dequeue(item) { + assert(item instanceof QueueItem); + + var next = item.next; + var prev = item.prev; + + if (next === null && prev === null) + return false; + + item.next = null; + item.prev = null; + + if (next === item) { + prev = null; + next = null; + } else { + prev.next = next; + next.prev = prev; + } + + if (this._list === item) + this._list = next; + + return true; + } + + doClose(cb) { + const handle = this._handle; + + setImmediate(() => { + while (this._list !== null) { + const item = this._list; + const req = item.req; + this._dequeue(item); + + const errCode = uv.UV_ECANCELED; + if (item.type === 'write') { + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); + } else if (item.type === 'shutdown') { + handle.finishShutdown(req, errCode); + } + } + + // Should be already set by net.js + assert.strictEqual(this._handle, null); + cb(); + }); + } +} + +function QueueItem(type, req) { + this.type = type; + this.req = req; + this.prev = this; + this.next = this; +} + +module.exports = JSStreamWrap; diff --git a/lib/net.js b/lib/net.js index 3c97110e3fbe15..53b9d33f485596 100644 --- a/lib/net.js +++ b/lib/net.js @@ -245,7 +245,7 @@ function Socket(options) { this._handle.reading = false; this._handle.readStop(); this._readableState.flowing = false; - } else { + } else if (!options.manualStart) { this.read(0); } } diff --git a/node.gyp b/node.gyp index 66949aa3b8bb5a..4b9e50e6dce307 100644 --- a/node.gyp +++ b/node.gyp @@ -133,6 +133,7 @@ 'lib/internal/streams/BufferList.js', 'lib/internal/streams/legacy.js', 'lib/internal/streams/destroy.js', + 'lib/internal/wrap_js_stream.js', 'deps/v8/tools/splaytree.js', 'deps/v8/tools/codemap.js', 'deps/v8/tools/consarray.js', diff --git a/test/parallel/test-tls-wrap-event-emmiter.js b/test/parallel/test-tls-wrap-event-emmiter.js index 82953f1333e5da..b6ae9e2d5a7e99 100644 --- a/test/parallel/test-tls-wrap-event-emmiter.js +++ b/test/parallel/test-tls-wrap-event-emmiter.js @@ -15,5 +15,5 @@ const TlsSocket = require('tls').TLSSocket; const EventEmitter = require('events').EventEmitter; assert.throws( () => { new TlsSocket(new EventEmitter()); }, - /^TypeError: this\.stream\.pause is not a function/ + /^TypeError: (.+) is not a function$/ );