Skip to content

Commit

Permalink
http2: refactor read mechanism
Browse files Browse the repository at this point in the history
Refactor the read mechanism to completely avoid copying.

Instead of copying individual `DATA` frame contents into buffers,
create `ArrayBuffer` instances for all socket reads and emit
slices of those `ArrayBuffer`s to JS.

PR-URL: #18030
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
addaleax authored and evanlucas committed Jan 30, 2018
1 parent 077bcbd commit b3332cc
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 127 deletions.
30 changes: 12 additions & 18 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -277,16 +277,15 @@ function submitRstStream(code) {
// point, close them. If there is an open fd for file send, close that also.
// At this point the underlying node::http2:Http2Stream handle is no
// longer usable so destroy it also.
function onStreamClose(code, hasData) {
function onStreamClose(code) {
const stream = this[kOwner];
if (stream.destroyed)
return;

const state = stream[kState];

debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: closed with code ${code}` +
` [has data? ${hasData}]`);
`${sessionName(stream[kSession][kType])}]: closed with code ${code}`);

if (!stream.closed) {
// Unenroll from timeouts
Expand All @@ -304,21 +303,22 @@ function onStreamClose(code, hasData) {

if (state.fd !== undefined)
tryClose(state.fd);
stream[kMaybeDestroy](null, code, hasData);
stream.push(null);
stream[kMaybeDestroy](null, code);
}

// Receives a chunk of data for a given stream and forwards it on
// to the Http2Stream Duplex for processing.
function onStreamRead(nread, buf, handle) {
const stream = handle[kOwner];
function onStreamRead(nread, buf) {
const stream = this[kOwner];
if (nread >= 0 && !stream.destroyed) {
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: receiving data chunk ` +
`of size ${nread}`);
stream[kUpdateTimer]();
if (!stream.push(buf)) {
if (!stream.destroyed) // we have to check a second time
handle.readStop();
this.readStop();
}
return;
}
Expand Down Expand Up @@ -1427,13 +1427,8 @@ function streamOnResume() {
}

function streamOnPause() {
// if (!this.destroyed && !this.pending)
// this[kHandle].readStop();
}

function handleFlushData(self) {
if (!this.destroyed && !this.pending)
this[kHandle].flushData();
this[kHandle].readStop();
}

// If the writable side of the Http2Stream is still open, emit the
Expand Down Expand Up @@ -1679,11 +1674,10 @@ class Http2Stream extends Duplex {
this.push(null);
return;
}
const flushfn = handleFlushData.bind(this);
if (!this.pending) {
flushfn();
streamOnResume.call(this);
} else {
this.once('ready', flushfn);
this.once('ready', streamOnResume);
}
}

Expand Down Expand Up @@ -1822,10 +1816,10 @@ class Http2Stream extends Duplex {

// The Http2Stream can be destroyed if it has closed and if the readable
// side has received the final chunk.
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR, hasData = true) {
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
if (error == null) {
if (code === NGHTTP2_NO_ERROR &&
((!this._readableState.ended && hasData) ||
(!this._readableState.ended ||
!this._writableState.ended ||
this._writableState.pendingcb > 0 ||
!this.closed)) {
Expand Down
184 changes: 85 additions & 99 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

namespace node {

using v8::ArrayBuffer;
using v8::Boolean;
using v8::Context;
using v8::Float64Array;
Expand Down Expand Up @@ -978,7 +979,6 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
// Intentionally ignore the callback if the stream does not exist or has
// already been destroyed
if (stream != nullptr && !stream->IsDestroyed()) {
stream->AddChunk(nullptr, 0);
stream->Close(code);
// It is possible for the stream close to occur before the stream is
// ever passed on to the javascript side. If that happens, skip straight
Expand All @@ -989,9 +989,8 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
stream->object()->Get(context, env->onstreamclose_string())
.ToLocalChecked();
if (fn->IsFunction()) {
Local<Value> argv[2] = {
Integer::NewFromUnsigned(isolate, code),
Boolean::New(isolate, stream->HasDataChunks(true))
Local<Value> argv[] = {
Integer::NewFromUnsigned(isolate, code)
};
stream->MakeCallback(fn.As<Function>(), arraysize(argv), argv);
} else {
Expand Down Expand Up @@ -1028,6 +1027,8 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
Http2Session* session = static_cast<Http2Session*>(user_data);
DEBUG_HTTP2SESSION2(session, "buffering data chunk for stream %d, size: "
"%d, flags: %d", id, len, flags);
Environment* env = session->env();
HandleScope scope(env->isolate());
// We should never actually get a 0-length chunk so this check is
// only a precaution at this point.
if (len > 0) {
Expand All @@ -1039,8 +1040,25 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
// If the stream has been destroyed, ignore this chunk
if (stream->IsDestroyed())
return 0;

stream->statistics_.received_bytes += len;
stream->AddChunk(data, len);

// There is a single large array buffer for the entire data read from the
// network; create a slice of that array buffer and emit it as the
// received data buffer.
CHECK(!session->stream_buf_ab_.IsEmpty());
size_t offset = reinterpret_cast<const char*>(data) - session->stream_buf_;
// Verify that the data offset is inside the current read buffer.
CHECK_LE(offset, session->stream_buf_size_);

Local<Object> buf =
Buffer::New(env, session->stream_buf_ab_, offset, len).ToLocalChecked();

stream->EmitData(len, buf, Local<Object>());
if (!stream->IsReading())
stream->inbound_consumed_data_while_paused_ += len;
else
nghttp2_session_consume_stream(handle, id, len);
}
return 0;
}
Expand Down Expand Up @@ -1226,9 +1244,8 @@ inline void Http2Session::HandlePriorityFrame(const nghttp2_frame* frame) {


// Called by OnFrameReceived when a complete DATA frame has been received.
// If we know that this is the last DATA frame (because the END_STREAM flag
// is set), then we'll terminate the readable side of the StreamBase. If
// the StreamBase is flowing, we'll push the chunks of data out to JS land.
// If we know that this was the last DATA frame (because the END_STREAM flag
// is set), then we'll terminate the readable side of the StreamBase.
inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
int32_t id = GetFrameID(frame);
DEBUG_HTTP2SESSION2(this, "handling data frame for stream %d", id);
Expand All @@ -1239,11 +1256,8 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
return;

if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
stream->AddChunk(nullptr, 0);
stream->EmitData(UV_EOF, Local<Object>(), Local<Object>());
}

if (stream->IsReading())
stream->FlushDataChunks();
}


Expand Down Expand Up @@ -1618,45 +1632,67 @@ void Http2Session::OnStreamAllocImpl(size_t suggested_size,
uv_buf_t* buf,
void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
buf->base = session->stream_alloc();
buf->len = kAllocBufferSize;
CHECK_EQ(session->stream_buf_, nullptr);
CHECK_EQ(session->stream_buf_size_, 0);
buf->base = session->stream_buf_ = Malloc(suggested_size);
buf->len = session->stream_buf_size_ = suggested_size;
session->IncrementCurrentSessionMemory(suggested_size);
}

// Callback used to receive inbound data from the i/o stream
void Http2Session::OnStreamReadImpl(ssize_t nread,
const uv_buf_t* bufs,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
Http2Scope h2scope(session);
CHECK_NE(session->stream_, nullptr);
DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread);
if (nread < 0) {
uv_buf_t tmp_buf;
tmp_buf.base = nullptr;
tmp_buf.len = 0;
session->prev_read_cb_.fn(nread,
&tmp_buf,
pending,
session->prev_read_cb_.ctx);
return;
}
if (bufs->len > 0) {
if (nread <= 0) {
free(session->stream_buf_);
if (nread < 0) {
uv_buf_t tmp_buf = uv_buf_init(nullptr, 0);
session->prev_read_cb_.fn(nread,
&tmp_buf,
pending,
session->prev_read_cb_.ctx);
}
} else {
// Only pass data on if nread > 0
uv_buf_t buf[] { uv_buf_init((*bufs).base, nread) };

// Verify that currently: There is memory allocated into which
// the data has been read, and that memory buffer is at least as large
// as the amount of data we have read, but we have not yet made an
// ArrayBuffer out of it.
CHECK_NE(session->stream_buf_, nullptr);
CHECK_EQ(session->stream_buf_, buf->base);
CHECK_EQ(session->stream_buf_size_, buf->len);
CHECK_GE(session->stream_buf_size_, static_cast<size_t>(nread));
CHECK(session->stream_buf_ab_.IsEmpty());

Environment* env = session->env();
Isolate* isolate = env->isolate();
HandleScope scope(isolate);
Local<Context> context = env->context();
Context::Scope context_scope(context);

// Create an array buffer for the read data. DATA frames will be emitted
// as slices of this array buffer to avoid having to copy memory.
session->stream_buf_ab_ =
ArrayBuffer::New(isolate,
session->stream_buf_,
session->stream_buf_size_,
v8::ArrayBufferCreationMode::kInternalized);

uv_buf_t buf_ = uv_buf_init(buf->base, nread);
session->statistics_.data_received += nread;
ssize_t ret = session->Write(buf, 1);
ssize_t ret = session->Write(&buf_, 1);

// Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef
// ssize_t to int. Cast here so that the < 0 check actually works on
// Windows.
if (static_cast<int>(ret) < 0) {
DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret);
Environment* env = session->env();
Isolate* isolate = env->isolate();
HandleScope scope(isolate);
Local<Context> context = env->context();
Context::Scope context_scope(context);

Local<Value> argv[1] = {
Integer::New(isolate, ret),
Expand All @@ -1667,6 +1703,13 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
nghttp2_session_want_read(**session));
}
}

// Since we are finished handling this write, reset the stream buffer.
// The memory has either been free()d or was handed over to V8.
session->DecrementCurrentSessionMemory(session->stream_buf_size_);
session->stream_buf_ = nullptr;
session->stream_buf_size_ = 0;
session->stream_buf_ab_ = Local<ArrayBuffer>();
}

void Http2Session::OnStreamDestructImpl(void* ctx) {
Expand Down Expand Up @@ -1781,30 +1824,6 @@ void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) {
}
}

inline bool Http2Stream::HasDataChunks(bool ignore_eos) {
return data_chunks_.size() > (ignore_eos ? 1 : 0);
}

// Appends a chunk of received DATA frame data to this Http2Streams internal
// queue. Note that we must memcpy each chunk because of the way that nghttp2
// handles it's internal memory`.
inline void Http2Stream::AddChunk(const uint8_t* data, size_t len) {
CHECK(!this->IsDestroyed());
if (this->statistics_.first_byte == 0)
this->statistics_.first_byte = uv_hrtime();
if (flags_ & NGHTTP2_STREAM_FLAG_EOS)
return;
char* buf = nullptr;
if (len > 0 && data != nullptr) {
buf = Malloc<char>(len);
memcpy(buf, data, len);
} else if (data == nullptr) {
flags_ |= NGHTTP2_STREAM_FLAG_EOS;
}
data_chunks_.emplace(uv_buf_init(buf, len));
}


inline void Http2Stream::Close(int32_t code) {
CHECK(!this->IsDestroyed());
flags_ |= NGHTTP2_STREAM_FLAG_CLOSED;
Expand Down Expand Up @@ -1841,13 +1860,6 @@ inline void Http2Stream::Destroy() {

DEBUG_HTTP2STREAM(this, "destroying stream");

// Free any remaining incoming data chunks.
while (!data_chunks_.empty()) {
uv_buf_t buf = data_chunks_.front();
free(buf.base);
data_chunks_.pop();
}

// Wait until the start of the next loop to delete because there
// may still be some pending operations queued for this stream.
env()->SetImmediate([](Environment* env, void* data) {
Expand All @@ -1873,39 +1885,6 @@ inline void Http2Stream::Destroy() {
}


// Uses the StreamBase API to push a single chunk of queued inbound DATA
// to JS land.
void Http2Stream::OnDataChunk(uv_buf_t* chunk) {
CHECK(!this->IsDestroyed());
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
ssize_t len = -1;
Local<Object> buf;
if (chunk != nullptr) {
len = chunk->len;
buf = Buffer::New(isolate, chunk->base, len).ToLocalChecked();
}
EmitData(len, buf, this->object());
}


inline void Http2Stream::FlushDataChunks() {
CHECK(!this->IsDestroyed());
Http2Scope h2scope(this);
if (!data_chunks_.empty()) {
uv_buf_t buf = data_chunks_.front();
data_chunks_.pop();
if (buf.len > 0) {
CHECK_EQ(nghttp2_session_consume_stream(session_->session(),
id_, buf.len), 0);
OnDataChunk(&buf);
} else {
OnDataChunk(nullptr);
}
}
}


// Initiates a response on the Http2Stream using data provided via the
// StreamBase Streams API.
inline int Http2Stream::SubmitResponse(nghttp2_nv* nva,
Expand Down Expand Up @@ -2012,13 +1991,20 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva,
// Switch the StreamBase into flowing mode to begin pushing chunks of data
// out to JS land.
inline int Http2Stream::ReadStart() {
Http2Scope h2scope(this);
CHECK(!this->IsDestroyed());
flags_ |= NGHTTP2_STREAM_FLAG_READ_START;
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;

// Flush any queued data chunks immediately out to the JS layer
FlushDataChunks();
DEBUG_HTTP2STREAM(this, "reading starting");

// Tell nghttp2 about our consumption of the data that was handed
// off to JS land.
nghttp2_session_consume_stream(session_->session(),
id_,
inbound_consumed_data_while_paused_);
inbound_consumed_data_while_paused_ = 0;

return 0;
}

Expand Down
Loading

0 comments on commit b3332cc

Please sign in to comment.