From bfd0764dc9296da52df7ffb728eabd2a2acbcc5a Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Thu, 3 Oct 2019 18:09:40 -0700 Subject: [PATCH] http: fully close streams on remote close in AsyncClient (#8467) Signed-off-by: Mike Schore --- source/common/http/async_client_impl.cc | 51 +++++++++------- test/common/http/async_client_impl_test.cc | 71 ++++++++++++++++++++++ 2 files changed, 101 insertions(+), 21 deletions(-) diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 709d48cc319c..e69f02ef0f66 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -93,6 +93,14 @@ void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) { ASSERT(!remote_closed_); stream_callbacks_.onHeaders(std::move(headers), end_stream); closeRemote(end_stream); + // At present, the router cleans up stream state as soon as the remote is closed, making a + // half-open local stream unsupported and dangerous. Ensure we close locally to trigger completion + // and keep things consistent. Another option would be to issue a stream reset here if local isn't + // yet closed, triggering cleanup along a more standardized path. However, this would require + // additional logic to handle the response completion and subsequent reset, and run the risk of + // being interpreted as a failure, when in fact no error has necessarily occurred. Gracefully + // closing seems most in-line with behavior elsewhere in Envoy for now. + closeLocal(end_stream); } void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) { @@ -101,6 +109,9 @@ void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) { ASSERT(!remote_closed_); stream_callbacks_.onData(data, end_stream); closeRemote(end_stream); + // Ensure we close locally on receiving a complete response; see comment in encodeHeaders for + // rationale. + closeLocal(end_stream); } void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) { @@ -108,6 +119,9 @@ void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) { ASSERT(!remote_closed_); stream_callbacks_.onTrailers(std::move(trailers)); closeRemote(true); + // Ensure we close locally on receiving a complete response; see comment in encodeHeaders for + // rationale. + closeLocal(true); } void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) { @@ -126,6 +140,14 @@ void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) { } void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { + // Map send calls after local closure to no-ops. The send call could have been queued prior to + // remote reset or closure, and/or closure could have occurred synchronously in response to a + // previous send. In these cases the router will have already cleaned up stream state. This + // parallels handling in the main Http::ConnectionManagerImpl as well. + if (local_closed_) { + return; + } + // TODO(mattklein123): We trust callers currently to not do anything insane here if they set up // buffering on an async client call. We should potentially think about limiting the size of // buffering that we allow here. @@ -138,16 +160,16 @@ void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { } void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) { + // See explanation in sendData. + if (local_closed_) { + return; + } + router_.decodeTrailers(trailers); closeLocal(true); } void AsyncStreamImpl::closeLocal(bool end_stream) { - // TODO(goaway): This assert maybe merits reconsideration. It seems to be saying that we shouldn't - // get here when trying to send the final frame of a stream that has already been closed locally, - // but it's fine for us to get here if we're trying to send a non-final frame. There's not an - // obvious reason why the first case would be not okay but the second case okay. - ASSERT(!(local_closed_ && end_stream)); // This guard ensures that we don't attempt to clean up a stream or fire a completion callback // for a stream that has already been closed. Both send* calls and resets can result in stream // closure, and this state may be updated synchronously during stream interaction and callbacks. @@ -216,23 +238,10 @@ AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent void AsyncRequestImpl::initialize() { sendHeaders(request_->headers(), !request_->body()); - // AsyncRequestImpl has historically been implemented to fire onComplete immediately upon - // receiving a complete response, regardless of whether the underlying stream was fully closed (in - // other words, regardless of whether the complete request had been sent). This had the potential - // to leak half-closed streams, which is now covered by manually firing closeLocal below. (See - // test PoolFailureWithBody for an example execution path.) - // TODO(goaway): Consider deeper cleanup of assumptions here. if (request_->body()) { - // sendHeaders can result in synchronous stream closure in certain cases (e.g. connection pool - // failure). - if (remoteClosed()) { - // In the case that we had a locally-generated response, we manually close the stream locally - // to fire the completion callback. This is a no-op if we had a locally-generated reset - // instead. - closeLocal(true); - } else { - sendData(*request_->body(), true); - } + // It's possible this will be a no-op due to a local response synchronously generated in + // sendHeaders; guards handle this within AsyncStreamImpl. + sendData(*request_->body(), true); } // TODO(mattklein123): Support request trailers. } diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 66845e6ced11..a9190d11c2c8 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -562,6 +562,77 @@ TEST_F(AsyncClientImplTest, LocalResetAfterStreamStart) { stream->reset(); } +TEST_F(AsyncClientImplTest, SendDataAfterRemoteClosure) { + Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")}; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); + response_decoder_ = &decoder; + return nullptr; + })); + + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + headers.addCopy("x-envoy-internal", "true"); + headers.addCopy("x-forwarded-for", "127.0.0.1"); + headers.addCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false)); + + TestHeaderMapImpl expected_headers{{":status", "200"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)); + EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + EXPECT_CALL(stream_callbacks_, onComplete()); + + AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); + stream->sendHeaders(headers, false); + + response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + response_decoder_->decodeData(*body, true); + + EXPECT_CALL(stream_encoder_, encodeData(_, _)).Times(0); + stream->sendData(*body, true); +} + +TEST_F(AsyncClientImplTest, SendTrailersRemoteClosure) { + Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")}; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); + response_decoder_ = &decoder; + return nullptr; + })); + + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + headers.addCopy("x-envoy-internal", "true"); + headers.addCopy("x-forwarded-for", "127.0.0.1"); + headers.addCopy(":scheme", "http"); + + TestHeaderMapImpl trailers; + trailers.addCopy("x-test-trailer", "1"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false)); + + TestHeaderMapImpl expected_headers{{":status", "200"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)); + EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + EXPECT_CALL(stream_callbacks_, onComplete()); + + AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); + stream->sendHeaders(headers, false); + + response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + response_decoder_->decodeData(*body, true); + + EXPECT_CALL(stream_encoder_, encodeTrailers(_)).Times(0); + stream->sendTrailers(trailers); +} + // Validate behavior when the stream's onHeaders() callback performs a stream // reset. TEST_F(AsyncClientImplTest, ResetInOnHeaders) {