From c2aea93837e296f7c6a81d2981951d17caeec797 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Sun, 7 Jun 2015 00:37:35 +0200 Subject: [PATCH 01/12] _stream_wrap: prevent use after free in TLS Queued write requests should be invoked on handle close, otherwise the "consumer" might be already destroyed when the write callbacks of the "consumed" handle will be invoked. Fix: https://github.com/iojs/io.js/issues/1696 --- lib/_stream_wrap.js | 72 ++++++++++++++++++- src/js_stream.cc | 2 +- src/tls_wrap.cc | 7 +- .../parallel/test-tls-destroy-whilst-write.js | 29 ++++++++ 4 files changed, 103 insertions(+), 7 deletions(-) create mode 100644 test/parallel/test-tls-destroy-whilst-write.js diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index 1eac2ddf9413fd..ea09d52a345cbd 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -10,9 +10,11 @@ function StreamWrap(stream) { this.stream = stream; + this.queue = null; + var self = this; handle.close = function(cb) { - cb(); + self.close(cb); }; handle.isAlive = function() { return self.isAlive(); @@ -35,10 +37,12 @@ function StreamWrap(stream) { this.stream.pause(); this.stream.on('data', function(chunk) { - self._handle.readBuffer(chunk); + if (self._handle) + self._handle.readBuffer(chunk); }); this.stream.once('end', function() { - self._handle.emitEOF(); + if (self._handle) + self._handle.emitEOF(); }); this.stream.on('error', function(err) { self.emit('error', err); @@ -88,6 +92,9 @@ StreamWrap.prototype.write = function write(req, bufs) { var pending = bufs.length; var self = this; + // Queue the request to be able to cancel it + self.enqueue(req); + self.stream.cork(); bufs.forEach(function(buf) { self.stream.write(buf, done); @@ -103,6 +110,10 @@ StreamWrap.prototype.write = function write(req, bufs) { // Ensure that write was dispatched setImmediate(function() { + // Do not invoke callback twice + if (!self.dequeue(req)) + return; + var errCode = 0; if (err) { if (err.code && uv['UV_' + err.code]) @@ -118,3 +129,58 @@ StreamWrap.prototype.write = function write(req, bufs) { return 0; }; + +StreamWrap.prototype.enqueue = function enqueue(req) { + if (this.queue === null) { + this.queue = req; + req._prev = req; + req._next = req; + return; + } + + req._next = this.queue._next; + this.queue._next._prev = req; + req._prev = this.queue; + this.queue._next = req; +}; + +StreamWrap.prototype.dequeue = function dequeue(req) { + var next = req._next; + var prev = req._prev; + + if (next === null && prev === null) + return false; + + req._next = null; + req._prev = null; + + if (next === prev) { + this.queue = null; + } else { + prev._next = next; + next._prev = prev; + + if (this.queue === req) + this.queue = next; + } + + return true; +}; + +StreamWrap.prototype.close = function close(cb) { + var self = this; + + setImmediate(function() { + while (self.queue !== null) { + var req = self.queue; + self.dequeue(req); + + var errCode = uv.UV_ECANCELED; + self._handle.doAfterWrite(req); + self._handle.finishWrite(req, errCode); + } + + self._handle = null; + cb(); + }); +}; diff --git a/src/js_stream.cc b/src/js_stream.cc index 6b7c4063e05a2a..91041d0201188d 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -163,7 +163,7 @@ template void JSStream::Finish(const FunctionCallbackInfo& args) { Wrap* w = Unwrap(args[0].As()); - w->Done(args[0]->Int32Value()); + w->Done(args[1]->Int32Value()); } diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index b8a648de923081..d4c7c9055d529d 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -320,6 +320,10 @@ void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) { TLSWrap* wrap = req_wrap->wrap()->Cast(); req_wrap->Dispose(); + // We should not be getting here after `DestroySSL`, because all queued writes + // must be invoked with UV_ECANCELED + CHECK_NE(wrap->ssl_, nullptr); + // Handle error if (status) { // Ignore errors after shutdown @@ -331,9 +335,6 @@ void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) { return; } - if (wrap->ssl_ == nullptr) - return; - // Commit NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_); diff --git a/test/parallel/test-tls-destroy-whilst-write.js b/test/parallel/test-tls-destroy-whilst-write.js new file mode 100644 index 00000000000000..8b865fab178365 --- /dev/null +++ b/test/parallel/test-tls-destroy-whilst-write.js @@ -0,0 +1,29 @@ +'use strict'; +var assert = require('assert'); +var common = require('../common'); + +if (!common.hasCrypto) { + console.log('1..0 # Skipped: missing crypto'); + process.exit(); +} +var tls = require('tls'); +var stream = require('stream'); + +var delay = new stream.Duplex({ + read: function read() { + }, + write: function write(data, enc, cb) { + console.log('pending'); + setTimeout(function() { + console.log('done'); + cb(); + }, 200); + } +}); + +var secure = tls.connect({ + socket: delay +}); +setImmediate(function() { + secure.destroy(); +}); From 8d46c7b5eba218f9929bbd14bd3c9861e03a2150 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 10 Jun 2015 13:08:03 +0200 Subject: [PATCH 02/12] _stream_wrap: internalize queue --- lib/_stream_wrap.js | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index ea09d52a345cbd..4eaf022d2117ea 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -10,7 +10,7 @@ function StreamWrap(stream) { this.stream = stream; - this.queue = null; + this._queue = null; var self = this; handle.close = function(cb) { @@ -93,7 +93,7 @@ StreamWrap.prototype.write = function write(req, bufs) { var self = this; // Queue the request to be able to cancel it - self.enqueue(req); + self._enqueue(req); self.stream.cork(); bufs.forEach(function(buf) { @@ -111,7 +111,7 @@ StreamWrap.prototype.write = function write(req, bufs) { // Ensure that write was dispatched setImmediate(function() { // Do not invoke callback twice - if (!self.dequeue(req)) + if (!self._dequeue(req)) return; var errCode = 0; @@ -130,21 +130,21 @@ StreamWrap.prototype.write = function write(req, bufs) { return 0; }; -StreamWrap.prototype.enqueue = function enqueue(req) { - if (this.queue === null) { - this.queue = req; +StreamWrap.prototype._enqueue = function enqueue(req) { + if (this._queue === null) { + this._queue = req; req._prev = req; req._next = req; return; } - req._next = this.queue._next; - this.queue._next._prev = req; - req._prev = this.queue; - this.queue._next = req; + req._next = this._queue._next; + this._queue._next._prev = req; + req._prev = this._queue; + this._queue._next = req; }; -StreamWrap.prototype.dequeue = function dequeue(req) { +StreamWrap.prototype._dequeue = function dequeue(req) { var next = req._next; var prev = req._prev; @@ -155,13 +155,13 @@ StreamWrap.prototype.dequeue = function dequeue(req) { req._prev = null; if (next === prev) { - this.queue = null; + this._queue = null; } else { prev._next = next; next._prev = prev; - if (this.queue === req) - this.queue = next; + if (this._queue === req) + this._queue = next; } return true; @@ -171,9 +171,9 @@ StreamWrap.prototype.close = function close(cb) { var self = this; setImmediate(function() { - while (self.queue !== null) { - var req = self.queue; - self.dequeue(req); + while (self._queue !== null) { + var req = self._queue; + self._dequeue(req); var errCode = uv.UV_ECANCELED; self._handle.doAfterWrite(req); From b349032d035f235f25e30f4d205915da86d8f1f2 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Thu, 11 Jun 2015 19:43:11 +0200 Subject: [PATCH 03/12] fix --- lib/_stream_wrap.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index 4eaf022d2117ea..4df3bd22551514 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -145,8 +145,8 @@ StreamWrap.prototype._enqueue = function enqueue(req) { }; StreamWrap.prototype._dequeue = function dequeue(req) { - var next = req._next; - var prev = req._prev; + const next = req._next; + const prev = req._prev; if (next === null && prev === null) return false; @@ -172,10 +172,10 @@ StreamWrap.prototype.close = function close(cb) { setImmediate(function() { while (self._queue !== null) { - var req = self._queue; + const req = self._queue; self._dequeue(req); - var errCode = uv.UV_ECANCELED; + const errCode = uv.UV_ECANCELED; self._handle.doAfterWrite(req); self._handle.finishWrite(req, errCode); } From b27ff94741e97d4e8a0952bb7653554e9c002735 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Thu, 11 Jun 2015 19:47:15 +0200 Subject: [PATCH 04/12] fix --- lib/_stream_wrap.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index 4df3bd22551514..c62a39cc056689 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -6,13 +6,13 @@ const JSStream = process.binding('js_stream').JSStream; const uv = process.binding('uv'); function StreamWrap(stream) { - var handle = new JSStream(); + const handle = new JSStream(); this.stream = stream; this._queue = null; - var self = this; + const self = this; handle.close = function(cb) { self.close(cb); }; @@ -77,7 +77,7 @@ StreamWrap.prototype.readStop = function readStop() { }; StreamWrap.prototype.shutdown = function shutdown(req) { - var self = this; + const self = this; this.stream.end(function() { // Ensure that write was dispatched @@ -89,8 +89,8 @@ StreamWrap.prototype.shutdown = function shutdown(req) { }; StreamWrap.prototype.write = function write(req, bufs) { + const self = this; var pending = bufs.length; - var self = this; // Queue the request to be able to cancel it self._enqueue(req); @@ -168,7 +168,7 @@ StreamWrap.prototype._dequeue = function dequeue(req) { }; StreamWrap.prototype.close = function close(cb) { - var self = this; + const self = this; setImmediate(function() { while (self._queue !== null) { From 94c12fe86ed14cc0380f842dd2ef6ed6a1638506 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 16 Jun 2015 11:13:00 -0400 Subject: [PATCH 05/12] fix assertion when destroying parent socket --- lib/_tls_wrap.js | 17 ++++++++++++++--- test/parallel/test-tls-connect-given-socket.js | 6 ++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index 8ccee2379d4309..446d69c0fb5bba 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -254,7 +254,7 @@ function TLSSocket(socket, options) { this.encrypted = true; net.Socket.call(this, { - handle: this._wrapHandle(wrap && wrap._handle), + handle: this._wrapHandle(wrap), allowHalfOpen: socket && socket.allowHalfOpen, readable: false, writable: false @@ -279,7 +279,7 @@ util.inherits(TLSSocket, net.Socket); exports.TLSSocket = TLSSocket; var proxiedMethods = [ - 'close', 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6', + 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6', 'connect6', 'getsockname', 'getpeername', 'setNoDelay', 'setKeepAlive', 'setSimultaneousAccepts', 'setBlocking', @@ -295,8 +295,18 @@ proxiedMethods.forEach(function(name) { }; }); -TLSSocket.prototype._wrapHandle = function(handle) { +tls_wrap.TLSWrap.prototype.close = function closeProxy() { + if (this._parentWrap && this._parentWrap._handle === this._parent) + return this._parentWrap.destroy(); + return this._parent.close.apply(this._parent, arguments); +}; + +TLSSocket.prototype._wrapHandle = function(wrap) { var res; + var handle; + + if (wrap) + handle = wrap._handle; var options = this._tlsOptions; if (!handle) { @@ -310,6 +320,7 @@ TLSSocket.prototype._wrapHandle = function(handle) { tls.createSecureContext(); res = tls_wrap.wrap(handle, context.context, options.isServer); res._parent = handle; + res._parentWrap = wrap; res._secureContext = context; res.reading = handle.reading; Object.defineProperty(handle, 'reading', { diff --git a/test/parallel/test-tls-connect-given-socket.js b/test/parallel/test-tls-connect-given-socket.js index 9e8170b13af1b7..7f3cd9901621dc 100644 --- a/test/parallel/test-tls-connect-given-socket.js +++ b/test/parallel/test-tls-connect-given-socket.js @@ -43,6 +43,8 @@ var server = tls.createServer(options, function(socket) { }); assert(client.readable); assert(client.writable); + + return client; } // Already connected socket @@ -53,6 +55,10 @@ var server = tls.createServer(options, function(socket) { // Connecting socket var connecting = net.connect(common.PORT); establish(connecting); + + // Outliving socket + var outliving = net.connect(common.PORT); + establish(outliving).destroy(); }); process.on('exit', function() { From 6cdda31b9ec563e13e67a3dcc0551645055d6906 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 16 Jun 2015 12:38:54 -0400 Subject: [PATCH 06/12] tls: migrate server, fix StreamWrap close --- lib/_stream_wrap.js | 15 ++++++---- lib/_tls_wrap.js | 19 +++++++++--- .../parallel/test-tls-connect-given-socket.js | 29 +++++++++++++------ 3 files changed, 45 insertions(+), 18 deletions(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index c62a39cc056689..d02181694c6711 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -1,5 +1,6 @@ 'use strict'; +const assert = require('assert'); const util = require('util'); const Socket = require('net').Socket; const JSStream = process.binding('js_stream').JSStream; @@ -90,6 +91,8 @@ StreamWrap.prototype.shutdown = function shutdown(req) { StreamWrap.prototype.write = function write(req, bufs) { const self = this; + const handle = self._handle; + var pending = bufs.length; // Queue the request to be able to cancel it @@ -122,8 +125,8 @@ StreamWrap.prototype.write = function write(req, bufs) { errCode = uv.UV_EPIPE; } - self._handle.doAfterWrite(req); - self._handle.finishWrite(req, errCode); + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); }); } @@ -169,6 +172,7 @@ StreamWrap.prototype._dequeue = function dequeue(req) { StreamWrap.prototype.close = function close(cb) { const self = this; + const handle = self._handle; setImmediate(function() { while (self._queue !== null) { @@ -176,11 +180,12 @@ StreamWrap.prototype.close = function close(cb) { self._dequeue(req); const errCode = uv.UV_ECANCELED; - self._handle.doAfterWrite(req); - self._handle.finishWrite(req, errCode); + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); } - self._handle = null; + // Should be already set by net.js + assert(self._handle === null); cb(); }); }; diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index 446d69c0fb5bba..45a10c105c40e5 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -295,10 +295,12 @@ proxiedMethods.forEach(function(name) { }; }); -tls_wrap.TLSWrap.prototype.close = function closeProxy() { - if (this._parentWrap && this._parentWrap._handle === this._parent) +tls_wrap.TLSWrap.prototype.close = function closeProxy(cb) { + if (this._parentWrap && this._parentWrap._handle === this._parent) { + setImmediate(cb); return this._parentWrap.destroy(); - return this._parent.close.apply(this._parent, arguments); + } + return this._parent.close(cb); }; TLSSocket.prototype._wrapHandle = function(wrap) { @@ -366,7 +368,11 @@ TLSSocket.prototype._init = function(socket, wrap) { // represent real writeQueueSize during regular writes. ssl.writeQueueSize = 1; - this.server = options.server || null; + this.server = options.server; + + // Move the server to TLSSocket + if (socket && socket.server === this.server) + socket.server = null; // For clients, we will always have either a given ca list or be using // default one @@ -429,6 +435,7 @@ TLSSocket.prototype._init = function(socket, wrap) { // set `.onsniselect` callback. if (process.features.tls_sni && options.isServer && + options.SNICallback && options.server && (options.SNICallback !== SNICallback || options.server._contexts.length)) { @@ -565,6 +572,10 @@ TLSSocket.prototype._start = function() { return; } + // Socket was destroyed before the connection was established + if (!this._handle) + return; + debug('start'); if (this._tlsOptions.requestOCSP) this._handle.requestOCSP(); diff --git a/test/parallel/test-tls-connect-given-socket.js b/test/parallel/test-tls-connect-given-socket.js index 7f3cd9901621dc..902b67aa515c02 100644 --- a/test/parallel/test-tls-connect-given-socket.js +++ b/test/parallel/test-tls-connect-given-socket.js @@ -47,18 +47,29 @@ var server = tls.createServer(options, function(socket) { return client; } - // Already connected socket - var connected = net.connect(common.PORT, function() { - establish(connected); + // Immediate death socket + var immediateDeath = net.connect(common.PORT); + establish(immediateDeath).destroy(); + + // Outliving + var outlivingTCP = net.connect(common.PORT); + outlivingTCP.on('connect', function() { + outlivingTLS.destroy(); + next(); }); + var outlivingTLS = establish(outlivingTCP); + + function next() { + // Already connected socket + var connected = net.connect(common.PORT, function() { + establish(connected); + }); - // Connecting socket - var connecting = net.connect(common.PORT); - establish(connecting); + // Connecting socket + var connecting = net.connect(common.PORT); + establish(connecting); - // Outliving socket - var outliving = net.connect(common.PORT); - establish(outliving).destroy(); + } }); process.on('exit', function() { From 8375f784a3c8a3b90b7549ddb5cfb12c9f5183c0 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 16 Jun 2015 13:05:38 -0400 Subject: [PATCH 07/12] tls: comment about moving server --- lib/_tls_wrap.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index 45a10c105c40e5..b96e577a76005b 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -370,7 +370,9 @@ TLSSocket.prototype._init = function(socket, wrap) { this.server = options.server; - // Move the server to TLSSocket + // Move the server to TLSSocket, otherwise both `socket.destroy()` and + // `TLSSocket.destroy()` will decrement number of connections of the TLS + // server, leading to misfiring `server.close()` callback if (socket && socket.server === this.server) socket.server = null; From 29b371380b46df36815231bf511d3f6da768adaa Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 17 Jun 2015 19:11:01 -0400 Subject: [PATCH 08/12] improvementswq --- lib/_stream_wrap.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index d02181694c6711..3faf5d55dcba8c 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -142,9 +142,9 @@ StreamWrap.prototype._enqueue = function enqueue(req) { } req._next = this._queue._next; - this._queue._next._prev = req; req._prev = this._queue; - this._queue._next = req; + req._next._prev = req; + req._prev._next = req; }; StreamWrap.prototype._dequeue = function dequeue(req) { From 95bc05aada8e1121dfd07ce17d4bd633d1739799 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 17 Jun 2015 20:21:33 -0400 Subject: [PATCH 09/12] fix --- lib/_stream_wrap.js | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index 3faf5d55dcba8c..cc7705da03f38b 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -148,8 +148,8 @@ StreamWrap.prototype._enqueue = function enqueue(req) { }; StreamWrap.prototype._dequeue = function dequeue(req) { - const next = req._next; - const prev = req._prev; + var next = req._next; + var prev = req._prev; if (next === null && prev === null) return false; @@ -157,16 +157,17 @@ StreamWrap.prototype._dequeue = function dequeue(req) { req._next = null; req._prev = null; - if (next === prev) { - this._queue = null; + if (next === req) { + prev = null; + next = null; } else { prev._next = next; next._prev = prev; - - if (this._queue === req) - this._queue = next; } + if (this._queue === req) + this._queue = next; + return true; }; From aff1b5fa3012abfda440f301a7fe3b4638dea597 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Sun, 28 Jun 2015 14:59:56 -0700 Subject: [PATCH 10/12] stream_wrap: fix `finishShutdown` error --- lib/_stream_wrap.js | 16 ++++++++-------- test/parallel/test-stream-wrap.js | 24 ++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 8 deletions(-) create mode 100644 test/parallel/test-stream-wrap.js diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index cc7705da03f38b..818138ffe527bf 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -15,7 +15,7 @@ function StreamWrap(stream) { const self = this; handle.close = function(cb) { - self.close(cb); + self.doClose(cb); }; handle.isAlive = function() { return self.isAlive(); @@ -30,10 +30,10 @@ function StreamWrap(stream) { return self.readStop(); }; handle.onshutdown = function(req) { - return self.shutdown(req); + return self.doShutdown(req); }; handle.onwrite = function(req, bufs) { - return self.write(req, bufs); + return self.doWrite(req, bufs); }; this.stream.pause(); @@ -77,19 +77,19 @@ StreamWrap.prototype.readStop = function readStop() { return 0; }; -StreamWrap.prototype.shutdown = function shutdown(req) { - const self = this; +StreamWrap.prototype.doShutdown = function doShutdown(req) { + const handle = this._handle; this.stream.end(function() { // Ensure that write was dispatched setImmediate(function() { - self._handle.finishShutdown(req, 0); + handle.finishShutdown(req, 0); }); }); return 0; }; -StreamWrap.prototype.write = function write(req, bufs) { +StreamWrap.prototype.doWrite = function doWrite(req, bufs) { const self = this; const handle = self._handle; @@ -171,7 +171,7 @@ StreamWrap.prototype._dequeue = function dequeue(req) { return true; }; -StreamWrap.prototype.close = function close(cb) { +StreamWrap.prototype.doClose = function doClose(cb) { const self = this; const handle = self._handle; diff --git a/test/parallel/test-stream-wrap.js b/test/parallel/test-stream-wrap.js new file mode 100644 index 00000000000000..5f661d2bb4780c --- /dev/null +++ b/test/parallel/test-stream-wrap.js @@ -0,0 +1,24 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const StreamWrap = require('_stream_wrap'); +const Duplex = require('stream').Duplex; +const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap; + +var stream = new Duplex({ + read: function() { + }, + write: function(data, enc, callback) { + callback(null); + } +}); + +var wrap = new StreamWrap(stream); + +var req = new ShutdownWrap(); +req.oncomplete = function() {}; +req.handle = wrap._handle; +wrap._handle.shutdown(req); + +wrap.destroy(); From b6444eeafd19c2e0eb8462a316e21e1b4b572eea Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Sun, 28 Jun 2015 18:47:32 -0700 Subject: [PATCH 11/12] stream_wrap: fix initialization order --- lib/_stream_wrap.js | 15 +++++----- test/parallel/test-stream-wrap.js | 48 ++++++++++++++++++++++--------- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index 818138ffe527bf..196ed3496970c8 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -37,6 +37,14 @@ function StreamWrap(stream) { }; this.stream.pause(); + this.stream.on('error', function(err) { + self.emit('error', err); + }); + + Socket.call(this, { + handle: handle + }); + this.stream.on('data', function(chunk) { if (self._handle) self._handle.readBuffer(chunk); @@ -45,13 +53,6 @@ function StreamWrap(stream) { if (self._handle) self._handle.emitEOF(); }); - this.stream.on('error', function(err) { - self.emit('error', err); - }); - - Socket.call(this, { - handle: handle - }); } util.inherits(StreamWrap, Socket); module.exports = StreamWrap; diff --git a/test/parallel/test-stream-wrap.js b/test/parallel/test-stream-wrap.js index 5f661d2bb4780c..cc28a344eb1e35 100644 --- a/test/parallel/test-stream-wrap.js +++ b/test/parallel/test-stream-wrap.js @@ -6,19 +6,41 @@ const StreamWrap = require('_stream_wrap'); const Duplex = require('stream').Duplex; const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap; -var stream = new Duplex({ - read: function() { - }, - write: function(data, enc, callback) { - callback(null); - } -}); +function testShutdown(callback) { + var stream = new Duplex({ + read: function() { + }, + write: function(data, enc, callback) { + callback(null); + } + }); + + var wrap = new StreamWrap(stream); + + var req = new ShutdownWrap(); + req.oncomplete = function() {}; + req.handle = wrap._handle; + wrap._handle.shutdown(req); -var wrap = new StreamWrap(stream); + wrap.destroy(); -var req = new ShutdownWrap(); -req.oncomplete = function() {}; -req.handle = wrap._handle; -wrap._handle.shutdown(req); + process.nextTick(callback); +} -wrap.destroy(); +function testReadAfterClose(callback) { + var stream = new Duplex({ + read: function() { + }, + write: function(data, enc, callback) { + callback(null); + } + }); + stream.push('data'); + stream.push(null); + + var wrap = new StreamWrap(stream); +} + +testShutdown(function() { + testReadAfterClose(); +}); From 3d3709bca7864ae69ddcc57b3741955570020376 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Mon, 29 Jun 2015 16:35:56 -0700 Subject: [PATCH 12/12] ... --- lib/_stream_wrap.js | 95 +++++++++++++++++++------------ test/parallel/test-stream-wrap.js | 35 +++++------- 2 files changed, 74 insertions(+), 56 deletions(-) diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index 196ed3496970c8..e05a4f31332126 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -5,6 +5,7 @@ const util = require('util'); const Socket = require('net').Socket; const JSStream = process.binding('js_stream').JSStream; const uv = process.binding('uv'); +const debug = util.debuglog('stream_wrap'); function StreamWrap(stream) { const handle = new JSStream(); @@ -15,6 +16,7 @@ function StreamWrap(stream) { const self = this; handle.close = function(cb) { + debug('close'); self.doClose(cb); }; handle.isAlive = function() { @@ -40,18 +42,23 @@ function StreamWrap(stream) { this.stream.on('error', function(err) { self.emit('error', err); }); - - Socket.call(this, { - handle: handle - }); - this.stream.on('data', function(chunk) { - if (self._handle) - self._handle.readBuffer(chunk); + setImmediate(function() { + debug('data', chunk.length); + if (self._handle) + self._handle.readBuffer(chunk); + }); }); this.stream.once('end', function() { - if (self._handle) - self._handle.emitEOF(); + setImmediate(function() { + debug('end'); + if (self._handle) + self._handle.emitEOF(); + }); + }); + + Socket.call(this, { + handle: handle }); } util.inherits(StreamWrap, Socket); @@ -61,11 +68,11 @@ module.exports = StreamWrap; StreamWrap.StreamWrap = StreamWrap; StreamWrap.prototype.isAlive = function isAlive() { - return this.readable && this.writable; + return true; }; StreamWrap.prototype.isClosing = function isClosing() { - return !this.isAlive(); + return !this.readable || !this.writable; }; StreamWrap.prototype.readStart = function readStart() { @@ -79,11 +86,16 @@ StreamWrap.prototype.readStop = function readStop() { }; StreamWrap.prototype.doShutdown = function doShutdown(req) { + const self = this; const handle = this._handle; + const item = this._enqueue('shutdown', req); this.stream.end(function() { // Ensure that write was dispatched setImmediate(function() { + if (!self._dequeue(item)) + return; + handle.finishShutdown(req, 0); }); }); @@ -97,7 +109,7 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) { var pending = bufs.length; // Queue the request to be able to cancel it - self._enqueue(req); + const item = self._enqueue('write', req); self.stream.cork(); bufs.forEach(function(buf) { @@ -115,7 +127,7 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) { // Ensure that write was dispatched setImmediate(function() { // Do not invoke callback twice - if (!self._dequeue(req)) + if (!self._dequeue(item)) return; var errCode = 0; @@ -134,39 +146,47 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) { return 0; }; -StreamWrap.prototype._enqueue = function enqueue(req) { +function QueueItem(type, req) { + this.type = type; + this.req = req; + this.prev = this; + this.next = this; +} + +StreamWrap.prototype._enqueue = function enqueue(type, req) { + const item = new QueueItem(type, req); if (this._queue === null) { - this._queue = req; - req._prev = req; - req._next = req; - return; + this._queue = item; + return item; } - req._next = this._queue._next; - req._prev = this._queue; - req._next._prev = req; - req._prev._next = req; + item.next = this._queue.next; + item.prev = this._queue; + item.next.prev = item; + item.prev.next = item; + + return item; }; -StreamWrap.prototype._dequeue = function dequeue(req) { - var next = req._next; - var prev = req._prev; +StreamWrap.prototype._dequeue = function dequeue(item) { + var next = item.next; + var prev = item.prev; if (next === null && prev === null) return false; - req._next = null; - req._prev = null; + item.next = null; + item.prev = null; - if (next === req) { + if (next === item) { prev = null; next = null; } else { - prev._next = next; - next._prev = prev; + prev.next = next; + next.prev = prev; } - if (this._queue === req) + if (this._queue === item) this._queue = next; return true; @@ -178,12 +198,17 @@ StreamWrap.prototype.doClose = function doClose(cb) { setImmediate(function() { while (self._queue !== null) { - const req = self._queue; - self._dequeue(req); + const item = self._queue; + const req = item.req; + self._dequeue(item); const errCode = uv.UV_ECANCELED; - handle.doAfterWrite(req); - handle.finishWrite(req, errCode); + if (item.type === 'write') { + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); + } else if (item.type === 'shutdown') { + handle.finishShutdown(req, errCode); + } } // Should be already set by net.js diff --git a/test/parallel/test-stream-wrap.js b/test/parallel/test-stream-wrap.js index cc28a344eb1e35..e7a7ecddd2385d 100644 --- a/test/parallel/test-stream-wrap.js +++ b/test/parallel/test-stream-wrap.js @@ -6,41 +6,34 @@ const StreamWrap = require('_stream_wrap'); const Duplex = require('stream').Duplex; const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap; +var done = false; + function testShutdown(callback) { var stream = new Duplex({ read: function() { }, - write: function(data, enc, callback) { - callback(null); + write: function() { } }); var wrap = new StreamWrap(stream); var req = new ShutdownWrap(); - req.oncomplete = function() {}; + req.oncomplete = function(code) { + assert(code < 0); + callback(); + }; req.handle = wrap._handle; - wrap._handle.shutdown(req); + // Close the handle to simulate wrap.destroy(); - - process.nextTick(callback); -} - -function testReadAfterClose(callback) { - var stream = new Duplex({ - read: function() { - }, - write: function(data, enc, callback) { - callback(null); - } - }); - stream.push('data'); - stream.push(null); - - var wrap = new StreamWrap(stream); + req.handle.shutdown(req); } testShutdown(function() { - testReadAfterClose(); + done = true; +}); + +process.on('exit', function() { + assert(done); });