Skip to content

Commit

Permalink
http2: refactor how trailers are done
Browse files Browse the repository at this point in the history
Rather than an option, introduce a method and an event...

```js
server.on('stream', (stream) => {
  stream.respond(undefined, { waitForTrailers: true });
  stream.on('wantTrailers', () => {
    stream.sendTrailers({ abc: 'xyz'});
  });
  stream.end('hello world');
});
```

This is a breaking change in the API such that the prior
`options.getTrailers` is no longer supported at all.
Ordinarily this would be semver-major and require a
deprecation but the http2 stuff is still experimental.

PR-URL: #19959
Reviewed-By: Yuta Hiroto <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
jasnell committed Apr 16, 2018
1 parent fc0ddaa commit 2ecdb6d
Show file tree
Hide file tree
Showing 17 changed files with 329 additions and 285 deletions.
13 changes: 13 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,19 @@ When setting the priority for an HTTP/2 stream, the stream may be marked as
a dependency for a parent stream. This error code is used when an attempt is
made to mark a stream and dependent of itself.

<a id="ERR_HTTP2_TRAILERS_ALREADY_SENT"></a>
### ERR_HTTP2_TRAILERS_ALREADY_SENT

Trailing headers have already been sent on the `Http2Stream`.

<a id="ERR_HTTP2_TRAILERS_NOT_READY"></a>
### ERR_HTTP2_TRAILERS_NOT_READY

The `http2stream.sendTrailers()` method cannot be called until after the
`'wantTrailers'` event is emitted on an `Http2Stream` object. The
`'wantTrailers'` event will only be emitted if the `waitForTrailers` option
is set for the `Http2Stream`.

<a id="ERR_HTTP2_UNSUPPORTED_PROTOCOL"></a>
### ERR_HTTP2_UNSUPPORTED_PROTOCOL

Expand Down
191 changes: 116 additions & 75 deletions doc/api/http2.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,11 @@ E('ERR_HTTP2_STREAM_CANCEL', 'The pending stream has been canceled', Error);
E('ERR_HTTP2_STREAM_ERROR', 'Stream closed with error code %s', Error);
E('ERR_HTTP2_STREAM_SELF_DEPENDENCY',
'A stream cannot depend on itself', Error);
E('ERR_HTTP2_TRAILERS_ALREADY_SENT',
'Trailing headers have already been sent', Error);
E('ERR_HTTP2_TRAILERS_NOT_READY',
'Trailing headers cannot be sent until after the wantTrailers event is ' +
'emitted', Error);
E('ERR_HTTP2_UNSUPPORTED_PROTOCOL', 'protocol "%s" is unsupported.', Error);
E('ERR_HTTP_HEADERS_SENT',
'Cannot %s headers after they are sent to the client', Error);
Expand Down
7 changes: 6 additions & 1 deletion lib/internal/http2/compat.js
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ class Http2ServerRequest extends Readable {
}
}

function onStreamTrailersReady() {
this[kStream].sendTrailers(this[kTrailers]);
}

class Http2ServerResponse extends Stream {
constructor(stream, options) {
super(options);
Expand All @@ -377,6 +381,7 @@ class Http2ServerResponse extends Stream {
stream.on('drain', onStreamDrain);
stream.on('aborted', onStreamAbortedResponse);
stream.on('close', this[kFinish].bind(this));
stream.on('wantTrailers', onStreamTrailersReady.bind(this));
}

// User land modules such as finalhandler just check truthiness of this
Expand Down Expand Up @@ -648,7 +653,7 @@ class Http2ServerResponse extends Stream {
headers[HTTP2_HEADER_STATUS] = state.statusCode;
const options = {
endStream: state.ending,
getTrailers: (trailers) => Object.assign(trailers, this[kTrailers])
waitForTrailers: true,
};
this[kStream].respond(headers, options);
}
Expand Down
85 changes: 43 additions & 42 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const {
ERR_HTTP2_STREAM_CANCEL,
ERR_HTTP2_STREAM_ERROR,
ERR_HTTP2_STREAM_SELF_DEPENDENCY,
ERR_HTTP2_TRAILERS_ALREADY_SENT,
ERR_HTTP2_TRAILERS_NOT_READY,
ERR_HTTP2_UNSUPPORTED_PROTOCOL,
ERR_INVALID_ARG_TYPE,
ERR_INVALID_CALLBACK,
Expand Down Expand Up @@ -295,25 +297,18 @@ function tryClose(fd) {
fs.close(fd, (err) => assert.ifError(err));
}

// Called to determine if there are trailers to be sent at the end of a
// Stream. The 'getTrailers' callback is invoked and passed a holder object.
// The trailers to return are set on that object by the handler. Once the
// event handler returns, those are sent off for processing. Note that this
// is a necessarily synchronous operation. We need to know immediately if
// there are trailing headers to send.
// Called when the Http2Stream has finished sending data and is ready for
// trailers to be sent. This will only be called if the { hasOptions: true }
// option is set.
function onStreamTrailers() {
const stream = this[kOwner];
stream[kState].trailersReady = true;
if (stream.destroyed)
return [];
const trailers = Object.create(null);
stream[kState].getTrailers.call(stream, trailers);
const headersList = mapToHeaders(trailers, assertValidPseudoHeaderTrailer);
if (!Array.isArray(headersList)) {
stream.destroy(headersList);
return [];
return;
if (!stream.emit('wantTrailers')) {
// There are no listeners, send empty trailing HEADERS frame and close.
stream.sendTrailers({});
}
stream[kSentTrailers] = trailers;
return headersList;
}

// Submit an RST-STREAM frame to be sent to the remote peer.
Expand Down Expand Up @@ -527,10 +522,8 @@ function requestOnConnect(headers, options) {
if (options.endStream)
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD;

if (typeof options.getTrailers === 'function') {
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].getTrailers = options.getTrailers;
}

// ret will be either the reserved stream ID (if positive)
// or an error code (if negative)
Expand Down Expand Up @@ -1408,11 +1401,6 @@ class ClientHttp2Session extends Http2Session {
throw new ERR_INVALID_OPT_VALUE('endStream', options.endStream);
}

if (options.getTrailers !== undefined &&
typeof options.getTrailers !== 'function') {
throw new ERR_INVALID_OPT_VALUE('getTrailers', options.getTrailers);
}

const headersList = mapToHeaders(headers);
if (!Array.isArray(headersList))
throw headersList;
Expand Down Expand Up @@ -1504,7 +1492,8 @@ class Http2Stream extends Duplex {
this[kState] = {
flags: STREAM_FLAGS_PENDING,
rstCode: NGHTTP2_NO_ERROR,
writeQueueSize: 0
writeQueueSize: 0,
trailersReady: false
};

this.on('resume', streamOnResume);
Expand Down Expand Up @@ -1745,6 +1734,33 @@ class Http2Stream extends Duplex {
priorityFn();
}

sendTrailers(headers) {
if (this.destroyed || this.closed)
throw new ERR_HTTP2_INVALID_STREAM();
if (this[kSentTrailers])
throw new ERR_HTTP2_TRAILERS_ALREADY_SENT();
if (!this[kState].trailersReady)
throw new ERR_HTTP2_TRAILERS_NOT_READY();

assertIsObject(headers, 'headers');
headers = Object.assign(Object.create(null), headers);

const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: sending trailers`);

this[kUpdateTimer]();

const headersList = mapToHeaders(headers, assertValidPseudoHeaderTrailer);
if (!Array.isArray(headersList))
throw headersList;
this[kSentTrailers] = headers;

const ret = this[kHandle].trailers(headersList);
if (ret < 0)
this.destroy(new NghttpError(ret));
}

get closed() {
return !!(this[kState].flags & STREAM_FLAGS_CLOSED);
}
Expand Down Expand Up @@ -2208,13 +2224,8 @@ class ServerHttp2Stream extends Http2Stream {
if (options.endStream)
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD;

if (options.getTrailers !== undefined) {
if (typeof options.getTrailers !== 'function') {
throw new ERR_INVALID_OPT_VALUE('getTrailers', options.getTrailers);
}
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
state.getTrailers = options.getTrailers;
}

headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
Expand Down Expand Up @@ -2274,13 +2285,8 @@ class ServerHttp2Stream extends Http2Stream {
}

let streamOptions = 0;
if (options.getTrailers !== undefined) {
if (typeof options.getTrailers !== 'function') {
throw new ERR_INVALID_OPT_VALUE('getTrailers', options.getTrailers);
}
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].getTrailers = options.getTrailers;
}

if (typeof fd !== 'number')
throw new ERR_INVALID_ARG_TYPE('fd', 'number', fd);
Expand Down Expand Up @@ -2340,13 +2346,8 @@ class ServerHttp2Stream extends Http2Stream {
}

let streamOptions = 0;
if (options.getTrailers !== undefined) {
if (typeof options.getTrailers !== 'function') {
throw new ERR_INVALID_OPT_VALUE('getTrailers', options.getTrailers);
}
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].getTrailers = options.getTrailers;
}

const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
Expand Down
100 changes: 40 additions & 60 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1066,16 +1066,6 @@ int Http2Session::OnNghttpError(nghttp2_session* handle,
return 0;
}

// Once all of the DATA frames for a Stream have been sent, the GetTrailers
// method calls out to JavaScript to fetch the trailing headers that need
// to be sent.
void Http2Session::GetTrailers(Http2Stream* stream, uint32_t* flags) {
if (!stream->IsDestroyed() && stream->HasTrailers()) {
Http2Stream::SubmitTrailers submit_trailers{this, stream, flags};
stream->OnTrailers(submit_trailers);
}
}

uv_buf_t Http2StreamListener::OnStreamAlloc(size_t size) {
// See the comments in Http2Session::OnDataChunkReceived
// (which is the only possible call site for this method).
Expand Down Expand Up @@ -1111,25 +1101,6 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
stream->CallJSOnreadMethod(nread, buffer);
}

Http2Stream::SubmitTrailers::SubmitTrailers(
Http2Session* session,
Http2Stream* stream,
uint32_t* flags)
: session_(session), stream_(stream), flags_(flags) { }


void Http2Stream::SubmitTrailers::Submit(nghttp2_nv* trailers,
size_t length) const {
Http2Scope h2scope(session_);
if (length == 0)
return;
DEBUG_HTTP2SESSION2(session_, "sending trailers for stream %d, count: %d",
stream_->id(), length);
*flags_ |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
CHECK_EQ(
nghttp2_submit_trailer(**session_, stream_->id(), trailers, length), 0);
}


// Called by OnFrameReceived to notify JavaScript land that a complete
// HEADERS frame has been received and processed. This method converts the
Expand Down Expand Up @@ -1725,30 +1696,6 @@ nghttp2_stream* Http2Stream::operator*() {
return nghttp2_session_find_stream(**session_, id_);
}


// Calls out to JavaScript land to fetch the actual trailer headers to send
// for this stream.
void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) {
DEBUG_HTTP2STREAM(this, "prompting for trailers");
CHECK(!this->IsDestroyed());
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Local<Context> context = env()->context();
Context::Scope context_scope(context);

Local<Value> ret =
MakeCallback(env()->ontrailers_string(), 0, nullptr).ToLocalChecked();
if (!ret.IsEmpty() && !IsDestroyed()) {
if (ret->IsArray()) {
Local<Array> headers = ret.As<Array>();
if (headers->Length() > 0) {
Headers trailers(isolate, context, headers);
submit_trailers.Submit(*trailers, trailers.length());
}
}
}
}

void Http2Stream::Close(int32_t code) {
CHECK(!this->IsDestroyed());
flags_ |= NGHTTP2_STREAM_FLAG_CLOSED;
Expand Down Expand Up @@ -1843,6 +1790,26 @@ int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
return ret;
}

void Http2Stream::OnTrailers() {
DEBUG_HTTP2STREAM(this, "let javascript know we are ready for trailers");
CHECK(!this->IsDestroyed());
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Local<Context> context = env()->context();
Context::Scope context_scope(context);
MakeCallback(env()->ontrailers_string(), 0, nullptr);
}

// Submit informational headers for a stream.
int Http2Stream::SubmitTrailers(nghttp2_nv* nva, size_t len) {
CHECK(!this->IsDestroyed());
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM2(this, "sending %d trailers", len);
int ret = nghttp2_submit_trailer(**session_, id_, nva, len);
CHECK_NE(ret, NGHTTP2_ERR_NOMEM);
return ret;
}

// Submit a PRIORITY frame to the connected peer.
int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec,
bool silent) {
Expand Down Expand Up @@ -2068,13 +2035,10 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle,
if (stream->queue_.empty() && !stream->IsWritable()) {
DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id);
*flags |= NGHTTP2_DATA_FLAG_EOF;
session->GetTrailers(stream, flags);
// If the stream or session gets destroyed during the GetTrailers
// callback, check that here and close down the stream
if (stream->IsDestroyed())
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
if (session->IsDestroyed())
return NGHTTP2_ERR_CALLBACK_FAILURE;
if (stream->HasTrailers()) {
*flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
stream->OnTrailers();
}
}

stream->statistics_.sent_bytes += amount;
Expand Down Expand Up @@ -2361,6 +2325,21 @@ void Http2Stream::Info(const FunctionCallbackInfo<Value>& args) {
headers->Length());
}

// Submits trailing headers on the Http2Stream
void Http2Stream::Trailers(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Context> context = env->context();
Isolate* isolate = env->isolate();
Http2Stream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());

Local<Array> headers = args[0].As<Array>();

Headers list(isolate, context, headers);
args.GetReturnValue().Set(stream->SubmitTrailers(*list, list.length()));
DEBUG_HTTP2STREAM2(stream, "%d trailing headers sent", headers->Length());
}

// Grab the numeric id of the Http2Stream
void Http2Stream::GetID(const FunctionCallbackInfo<Value>& args) {
Http2Stream* stream;
Expand Down Expand Up @@ -2706,6 +2685,7 @@ void Initialize(Local<Object> target,
env->SetProtoMethod(stream, "priority", Http2Stream::Priority);
env->SetProtoMethod(stream, "pushPromise", Http2Stream::PushPromise);
env->SetProtoMethod(stream, "info", Http2Stream::Info);
env->SetProtoMethod(stream, "trailers", Http2Stream::Trailers);
env->SetProtoMethod(stream, "respond", Http2Stream::Respond);
env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream);
env->SetProtoMethod(stream, "refreshState", Http2Stream::RefreshState);
Expand Down
Loading

0 comments on commit 2ecdb6d

Please sign in to comment.