Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: merge onread handlers for http2 streams & net.Socket #22449

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 5 additions & 33 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ const {
const {
createWriteWrap,
writeGeneric,
writevGeneric
writevGeneric,
onStreamRead,
kMaybeDestroy,
kUpdateTimer
} = require('internal/stream_base_commons');
const {
kTimeout,
Expand Down Expand Up @@ -142,7 +145,6 @@ const kHandle = Symbol('handle');
const kID = Symbol('id');
const kInit = Symbol('init');
const kInfoHeaders = Symbol('sent-info-headers');
const kMaybeDestroy = Symbol('maybe-destroy');
const kLocalSettings = Symbol('local-settings');
const kOptions = Symbol('options');
const kOwner = owner_symbol;
Expand All @@ -156,7 +158,6 @@ const kServer = Symbol('server');
const kSession = Symbol('session');
const kState = Symbol('state');
const kType = Symbol('type');
const kUpdateTimer = Symbol('update-timer');
const kWriteGeneric = Symbol('write-generic');

const kDefaultSocketTimeout = 2 * 60 * 1000;
Expand Down Expand Up @@ -374,36 +375,6 @@ function onStreamClose(code) {
}
}

// Receives a chunk of data for a given stream and forwards it on
// to the Http2Stream Duplex for processing.
function onStreamRead(nread, buf) {
const stream = this[kOwner];
if (nread >= 0 && !stream.destroyed) {
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: receiving data chunk ` +
`of size ${nread}`);
stream[kUpdateTimer]();
if (!stream.push(buf)) {
if (!stream.destroyed) // we have to check a second time
this.readStop();
}
return;
}

// Last chunk was received. End the readable side.
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: ending readable.`);

// defer this until we actually emit end
if (!stream.readable) {
stream[kMaybeDestroy]();
} else {
stream.on('end', stream[kMaybeDestroy]);
stream.push(null);
stream.read(0);
}
}

// Called when the remote peer settings have been updated.
// Resets the cached settings.
function onSettings() {
Expand Down Expand Up @@ -2145,6 +2116,7 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
class ServerHttp2Stream extends Http2Stream {
constructor(session, handle, id, options, headers) {
super(session, options);
handle.owner = this;
this[kInit](id, handle);
this[kProtocol] = headers[HTTP2_HEADER_SCHEME];
this[kAuthority] = headers[HTTP2_HEADER_AUTHORITY];
Expand Down
55 changes: 52 additions & 3 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
'use strict';

const { Buffer } = require('buffer');
const errors = require('internal/errors');
const { internalBinding } = require('internal/bootstrap/loaders');
const { WriteWrap } = internalBinding('stream_wrap');
const { UV_EOF } = internalBinding('uv');
const { errnoException } = require('internal/errors');
const { owner_symbol } = require('internal/async_hooks').symbols;

const errnoException = errors.errnoException;
const kMaybeDestroy = Symbol('kMaybeDestroy');
const kUpdateTimer = Symbol('kUpdateTimer');

function handleWriteReq(req, data, encoding) {
const { handle } = req;
Expand Down Expand Up @@ -82,8 +85,54 @@ function afterWriteDispatched(self, req, err, cb) {
}
}

function onStreamRead(nread, buf) {
const handle = this;
const stream = this[owner_symbol];

stream[kUpdateTimer]();

if (nread > 0 && !stream.destroyed) {
if (!stream.push(buf)) {
handle.reading = false;
if (!stream.destroyed) {
const err = handle.readStop();
if (err)
stream.destroy(errnoException(err, 'read'));
}
}

return;
}

if (nread === 0) {
return;
}

if (nread !== UV_EOF) {
return stream.destroy(errnoException(nread, 'read'));
}

// defer this until we actually emit end
if (stream._readableState.endEmitted) {
if (stream[kMaybeDestroy])
stream[kMaybeDestroy]();
} else {
if (stream[kMaybeDestroy])
stream.on('end', stream[kMaybeDestroy]);

// push a null to signal the end of data.
// Do it before `maybeDestroy` for correct order of events:
// `end` -> `close`
stream.push(null);
stream.read(0);
}
}

module.exports = {
createWriteWrap,
writevGeneric,
writeGeneric
writeGeneric,
onStreamRead,
kMaybeDestroy,
kUpdateTimer,
};
70 changes: 11 additions & 59 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ const assert = require('assert');
const { internalBinding } = require('internal/bootstrap/loaders');
const {
UV_EADDRINUSE,
UV_EINVAL,
UV_EOF
UV_EINVAL
} = internalBinding('uv');

const { Buffer } = require('buffer');
Expand All @@ -62,7 +61,9 @@ const {
const {
createWriteWrap,
writevGeneric,
writeGeneric
writeGeneric,
onStreamRead,
kUpdateTimer
} = require('internal/stream_base_commons');
const errors = require('internal/errors');
const {
Expand Down Expand Up @@ -209,7 +210,7 @@ function initSocketHandle(self) {
// Handle creation may be deferred to bind() or connect() time.
if (self._handle) {
self._handle[owner_symbol] = self;
self._handle.onread = onread;
self._handle.onread = onStreamRead;
self[async_id_symbol] = getNewAsyncId(self._handle);
}
}
Expand Down Expand Up @@ -515,6 +516,12 @@ Object.defineProperty(Socket.prototype, 'bufferSize', {
}
});

Object.defineProperty(Socket.prototype, kUpdateTimer, {
get: function() {
return this._unrefTimer;
}
});


// Just call handle.readStart until we have enough in the buffer
Socket.prototype._read = function(n) {
Expand Down Expand Up @@ -616,61 +623,6 @@ Socket.prototype._destroy = function(exception, cb) {
}
};


// This function is called whenever the handle gets a
// buffer, or when there's an error reading.
function onread(nread, buffer) {
var handle = this;
var self = handle[owner_symbol];
assert(handle === self._handle, 'handle != self._handle');

self._unrefTimer();

debug('onread', nread);

if (nread > 0) {
debug('got data');

// read success.
// In theory (and in practice) calling readStop right now
// will prevent this from being called again until _read() gets
// called again.

// Optimization: emit the original buffer with end points
var ret = self.push(buffer);

if (handle.reading && !ret) {
handle.reading = false;
debug('readStop');
var err = handle.readStop();
if (err)
self.destroy(errnoException(err, 'read'));
}
return;
}

// if we didn't get any bytes, that doesn't necessarily mean EOF.
// wait for the next one.
if (nread === 0) {
debug('not any data, keep waiting');
return;
}

// Error, possibly EOF.
if (nread !== UV_EOF) {
return self.destroy(errnoException(nread, 'read'));
}

debug('EOF');

// push a null to signal the end of data.
// Do it before `maybeDestroy` for correct order of events:
// `end` -> `close`
self.push(null);
self.read(0);
}


Socket.prototype._getpeername = function() {
if (!this._peername) {
if (!this._handle || !this._handle.getpeername) {
Expand Down