Skip to content

Commit

Permalink
http: fully close streams on remote close in AsyncClient (envoyproxy#…
Browse files Browse the repository at this point in the history
…8467)

Signed-off-by: Mike Schore <[email protected]>
  • Loading branch information
goaway committed Oct 4, 2019
1 parent 3713ce5 commit bfd0764
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 21 deletions.
51 changes: 30 additions & 21 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
ASSERT(!remote_closed_);
stream_callbacks_.onHeaders(std::move(headers), end_stream);
closeRemote(end_stream);
// At present, the router cleans up stream state as soon as the remote is closed, making a
// half-open local stream unsupported and dangerous. Ensure we close locally to trigger completion
// and keep things consistent. Another option would be to issue a stream reset here if local isn't
// yet closed, triggering cleanup along a more standardized path. However, this would require
// additional logic to handle the response completion and subsequent reset, and run the risk of
// being interpreted as a failure, when in fact no error has necessarily occurred. Gracefully
// closing seems most in-line with behavior elsewhere in Envoy for now.
closeLocal(end_stream);
}

void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
Expand All @@ -101,13 +109,19 @@ void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
ASSERT(!remote_closed_);
stream_callbacks_.onData(data, end_stream);
closeRemote(end_stream);
// Ensure we close locally on receiving a complete response; see comment in encodeHeaders for
// rationale.
closeLocal(end_stream);
}

void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) {
ENVOY_LOG(debug, "async http request response trailers:\n{}", *trailers);
ASSERT(!remote_closed_);
stream_callbacks_.onTrailers(std::move(trailers));
closeRemote(true);
// Ensure we close locally on receiving a complete response; see comment in encodeHeaders for
// rationale.
closeLocal(true);
}

void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) {
Expand All @@ -126,6 +140,14 @@ void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) {
}

void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
// Map send calls after local closure to no-ops. The send call could have been queued prior to
// remote reset or closure, and/or closure could have occurred synchronously in response to a
// previous send. In these cases the router will have already cleaned up stream state. This
// parallels handling in the main Http::ConnectionManagerImpl as well.
if (local_closed_) {
return;
}

// TODO(mattklein123): We trust callers currently to not do anything insane here if they set up
// buffering on an async client call. We should potentially think about limiting the size of
// buffering that we allow here.
Expand All @@ -138,16 +160,16 @@ void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
}

void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) {
// See explanation in sendData.
if (local_closed_) {
return;
}

router_.decodeTrailers(trailers);
closeLocal(true);
}

void AsyncStreamImpl::closeLocal(bool end_stream) {
// TODO(goaway): This assert maybe merits reconsideration. It seems to be saying that we shouldn't
// get here when trying to send the final frame of a stream that has already been closed locally,
// but it's fine for us to get here if we're trying to send a non-final frame. There's not an
// obvious reason why the first case would be not okay but the second case okay.
ASSERT(!(local_closed_ && end_stream));
// This guard ensures that we don't attempt to clean up a stream or fire a completion callback
// for a stream that has already been closed. Both send* calls and resets can result in stream
// closure, and this state may be updated synchronously during stream interaction and callbacks.
Expand Down Expand Up @@ -216,23 +238,10 @@ AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent

void AsyncRequestImpl::initialize() {
sendHeaders(request_->headers(), !request_->body());
// AsyncRequestImpl has historically been implemented to fire onComplete immediately upon
// receiving a complete response, regardless of whether the underlying stream was fully closed (in
// other words, regardless of whether the complete request had been sent). This had the potential
// to leak half-closed streams, which is now covered by manually firing closeLocal below. (See
// test PoolFailureWithBody for an example execution path.)
// TODO(goaway): Consider deeper cleanup of assumptions here.
if (request_->body()) {
// sendHeaders can result in synchronous stream closure in certain cases (e.g. connection pool
// failure).
if (remoteClosed()) {
// In the case that we had a locally-generated response, we manually close the stream locally
// to fire the completion callback. This is a no-op if we had a locally-generated reset
// instead.
closeLocal(true);
} else {
sendData(*request_->body(), true);
}
// It's possible this will be a no-op due to a local response synchronously generated in
// sendHeaders; guards handle this within AsyncStreamImpl.
sendData(*request_->body(), true);
}
// TODO(mattklein123): Support request trailers.
}
Expand Down
71 changes: 71 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,77 @@ TEST_F(AsyncClientImplTest, LocalResetAfterStreamStart) {
stream->reset();
}

TEST_F(AsyncClientImplTest, SendDataAfterRemoteClosure) {
Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")};

EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder& decoder,
ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_);
response_decoder_ = &decoder;
return nullptr;
}));

TestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
headers.addCopy("x-envoy-internal", "true");
headers.addCopy("x-forwarded-for", "127.0.0.1");
headers.addCopy(":scheme", "http");

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false));

TestHeaderMapImpl expected_headers{{":status", "200"}};
EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false));
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);

response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false);
response_decoder_->decodeData(*body, true);

EXPECT_CALL(stream_encoder_, encodeData(_, _)).Times(0);
stream->sendData(*body, true);
}

TEST_F(AsyncClientImplTest, SendTrailersRemoteClosure) {
Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")};

EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder& decoder,
ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_);
response_decoder_ = &decoder;
return nullptr;
}));

TestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
headers.addCopy("x-envoy-internal", "true");
headers.addCopy("x-forwarded-for", "127.0.0.1");
headers.addCopy(":scheme", "http");

TestHeaderMapImpl trailers;
trailers.addCopy("x-test-trailer", "1");

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false));

TestHeaderMapImpl expected_headers{{":status", "200"}};
EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false));
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);

response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false);
response_decoder_->decodeData(*body, true);

EXPECT_CALL(stream_encoder_, encodeTrailers(_)).Times(0);
stream->sendTrailers(trailers);
}

// Validate behavior when the stream's onHeaders() callback performs a stream
// reset.
TEST_F(AsyncClientImplTest, ResetInOnHeaders) {
Expand Down

0 comments on commit bfd0764

Please sign in to comment.