Skip to content

Commit

Permalink
stream: move _end to state
Browse files Browse the repository at this point in the history
  • Loading branch information
indutny committed Mar 22, 2015
1 parent dc36915 commit 1dd3af6
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
17 changes: 11 additions & 6 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ function WriteReq(chunk, encoding, cb) {
this.next = null;
}


function _end(stream, cb) {
cb(null);
}

function WritableState(options, stream) {
options = options || {};

Expand Down Expand Up @@ -108,8 +113,12 @@ function WritableState(options, stream) {
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;

// when `_end()` was called
// when `state._end()` was called
this.flushed = false;

// NOTE: added here to not pollute the prototype of the WritableStream and to
// avoid conflicts with user-land methods
this._end = options._end || _end;
}

WritableState.prototype.getBuffer = function writableStateGetBuffer() {
Expand Down Expand Up @@ -429,10 +438,6 @@ Writable.prototype._write = function(chunk, encoding, cb) {

Writable.prototype._writev = null;

Writable.prototype._end = function(cb) {
cb(null);
};

Writable.prototype.end = function(chunk, encoding, cb) {
var state = this._writableState;

Expand Down Expand Up @@ -482,7 +487,7 @@ function finishMaybe(stream, state) {
if (state.pendingcb === 0) {
prefinish(stream, state);
stream.flushed = true;
stream._end(function(err) {
state._end(stream, function(err) {
if (err)
stream.emit('error', err);

Expand Down
19 changes: 11 additions & 8 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ function Socket(options) {

stream.Duplex.call(this, options);

// NOTE: do it here to avoid copying `options`
this._writableState._end = _end;

if (options.handle) {
this._handle = options.handle; // private
} else if (options.fd !== undefined) {
Expand Down Expand Up @@ -663,38 +666,38 @@ Socket.prototype._write = function(data, encoding, cb) {
this._writeGeneric(false, data, encoding, cb);
};

Socket.prototype._end = function _end(cb) {
function _end(socket, cb) {
debug('_end');

// If still connecting - defer handling 'finish' until 'connect' will happen
if (this._connecting) {
if (socket._connecting) {
debug('_end: not yet connected');
return this.once('connect', function() {
this._end(cb);
return socket.once('connect', function() {
socket._end(cb);
});
}

if (!this.readable || this._readableState.ended) {
if (!socket.readable || socket._readableState.ended) {
debug('_end: not readable or ended');
return cb();
}

// otherwise, just shutdown, or destroy() if not possible
if (!this._handle || !this._handle.shutdown) {
if (!socket._handle || !socket._handle.shutdown) {
debug('_end: no handle or handle does not support shutdown');
return cb();
}

var req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.flushCb = cb;
var err = this._handle.shutdown(req);
var err = socket._handle.shutdown(req);

if (err) {
debug('_end: errno %s', err);
return cb(errnoException(err, 'shutdown'));
}
};
}


function afterShutdown(status, handle, req) {
Expand Down

0 comments on commit 1dd3af6

Please sign in to comment.