Skip to content

Commit

Permalink
Extend AsyncClient to support streaming requests / responses (#353)
Browse files Browse the repository at this point in the history
  • Loading branch information
lizan authored and mattklein123 committed Feb 2, 2017
1 parent 02c6fc9 commit 14d3e97
Show file tree
Hide file tree
Showing 6 changed files with 721 additions and 88 deletions.
88 changes: 87 additions & 1 deletion include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,45 @@ class AsyncClient {
};

/**
* An in-flight HTTP request
* Notifies caller of async HTTP stream status.
* Note the HTTP stream is full-duplex, even if the local to remote stream has been ended
* by Stream.sendHeaders/sendData with end_stream=true or sendTrailers,
* StreamCallbacks can continue to receive events until the remote to local stream is closed,
* and vice versa.
*/
class StreamCallbacks {
public:
virtual ~StreamCallbacks() {}

/**
* Called when all headers get received on the async HTTP stream.
* @param headers the headers received
* @param end_stream whether the response is header only
*/
virtual void onHeaders(HeaderMapPtr&& headers, bool end_stream) PURE;

/**
* Called when a data frame get received on the async HTTP stream.
* This can be invoked multiple times if the data get streamed.
* @param data the data received
* @param end_stream whether the data is the last data frame
*/
virtual void onData(Buffer::Instance& data, bool end_stream) PURE;

/**
* Called when all trailers get received on the async HTTP stream.
* @param trailers the trailers received.
*/
virtual void onTrailers(HeaderMapPtr&& trailers) PURE;

/**
* Called when the async HTTP stream is reset.
*/
virtual void onReset() PURE;
};

/**
* An in-flight HTTP request.
*/
class Request {
public:
Expand All @@ -50,18 +88,66 @@ class AsyncClient {
virtual void cancel() PURE;
};

/**
* An in-flight HTTP stream.
*/
class Stream {
public:
virtual ~Stream() {}

/***
* Send headers to the stream. This method cannot be invoked more than once and
* need to be called before sendData.
* @param headers supplies the headers to send.
* @param end_stream supplies whether this is a header only request.
*/
virtual void sendHeaders(HeaderMap& headers, bool end_stream) PURE;

/***
* Send data to the stream. This method can be invoked multiple times if it get streamed.
* To end the stream without data, call this method with empty buffer.
* @param data supplies the data to send.
* @param end_stream supplies whether this is the last data.
*/
virtual void sendData(Buffer::Instance& data, bool end_stream) PURE;

/***
* Send trailers. This method cannot be invoked more than once, and implicitly ends the stream.
* @param trailers supplies the trailers to send.
*/
virtual void sendTrailers(HeaderMap& trailers) PURE;

/***
* Reset the stream.
*/
virtual void reset() PURE;
};

virtual ~AsyncClient() {}

/**
* Send an HTTP request asynchronously
* @param request the request to send.
* @param callbacks the callbacks to be notified of request status.
* @param timeout supplies the request timeout
* @return a request handle or nullptr if no request could be created. NOTE: In this case
* onFailure() has already been called inline. The client owns the request and the
* handle should just be used to cancel.
*/
virtual Request* send(MessagePtr&& request, Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;

/**
* Start an HTTP stream asynchronously.
* @param callbacks the callbacks to be notified of stream status.
* @param timeout supplies the stream timeout, measured since when the frame with end_stream
* flag is sent until when the first frame is received.
* @return a stream handle or nullptr if no stream could be started. NOTE: In this case
* onResetStream() has already been called inline. The client owns the stream and
* the handle can be used to send more messages or close the stream.
*/
virtual Stream* start(StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;
};

typedef std::unique_ptr<AsyncClient> AsyncClientPtr;
Expand Down
196 changes: 129 additions & 67 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#include "async_client_impl.h"
#include "headers.h"

namespace Http {

const std::vector<std::reference_wrapper<const Router::RateLimitPolicyEntry>>
AsyncRequestImpl::NullRateLimitPolicy::rate_limit_policy_entry_;
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::RouteEntryImpl::rate_limit_policy_;
const AsyncRequestImpl::NullRetryPolicy AsyncRequestImpl::RouteEntryImpl::retry_policy_;
const AsyncRequestImpl::NullShadowPolicy AsyncRequestImpl::RouteEntryImpl::shadow_policy_;
const AsyncRequestImpl::NullVirtualHost AsyncRequestImpl::RouteEntryImpl::virtual_host_;
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::NullVirtualHost::rate_limit_policy_;
AsyncStreamImpl::NullRateLimitPolicy::rate_limit_policy_entry_;
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::RouteEntryImpl::rate_limit_policy_;
const AsyncStreamImpl::NullRetryPolicy AsyncStreamImpl::RouteEntryImpl::retry_policy_;
const AsyncStreamImpl::NullShadowPolicy AsyncStreamImpl::RouteEntryImpl::shadow_policy_;
const AsyncStreamImpl::NullVirtualHost AsyncStreamImpl::RouteEntryImpl::virtual_host_;
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::NullVirtualHost::rate_limit_policy_;

AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store,
Event::Dispatcher& dispatcher,
Expand All @@ -22,120 +21,183 @@ AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::St
dispatcher_(dispatcher) {}

AsyncClientImpl::~AsyncClientImpl() {
while (!active_requests_.empty()) {
active_requests_.front()->failDueToClientDestroy();
while (!active_streams_.empty()) {
active_streams_.front()->reset();
}
}

AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncRequestImpl> new_request{
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout)};
AsyncRequestImpl* async_request =
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout);
std::unique_ptr<AsyncStreamImpl> new_request{async_request};

// The request may get immediately failed. If so, we will return nullptr.
if (!new_request->complete_) {
new_request->moveIntoList(std::move(new_request), active_requests_);
return active_requests_.front().get();
if (!new_request->remote_closed_) {
new_request->moveIntoList(std::move(new_request), active_streams_);
return async_request;
} else {
return nullptr;
}
}

AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: request_(std::move(request)), parent_(parent), callbacks_(callbacks),
stream_id_(parent.config_.random_.random()), router_(parent.config_),
request_info_(Protocol::Http11), route_(parent_.cluster_.name(), timeout) {
AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncStreamImpl> new_stream{new AsyncStreamImpl(*this, callbacks, timeout)};

router_.setDecoderFilterCallbacks(*this);
request_->headers().insertEnvoyInternalRequest().value(
Headers::get().EnvoyInternalRequestValues.True);
request_->headers().insertForwardedFor().value(parent_.config_.local_info_.address());
router_.decodeHeaders(request_->headers(), !request_->body());
if (!complete_ && request_->body()) {
router_.decodeData(*request_->body(), true);
// The request may get immediately failed. If so, we will return nullptr.
if (!new_stream->remote_closed_) {
new_stream->moveIntoList(std::move(new_stream), active_streams_);
return active_streams_.front().get();
} else {
return nullptr;
}
}

// TODO: Support request trailers.
AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
router_(parent.config_), request_info_(Protocol::Http11),
route_(parent_.cluster_.name(), timeout) {

router_.setDecoderFilterCallbacks(*this);
// TODO: Correctly set protocol in request info when we support access logging.
}

AsyncRequestImpl::~AsyncRequestImpl() { ASSERT(!reset_callback_); }
AsyncStreamImpl::~AsyncStreamImpl() { ASSERT(!reset_callback_); }

void AsyncRequestImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
response_.reset(new ResponseMessageImpl(std::move(headers)));
void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
#ifndef NDEBUG
log_debug("async http request response headers (end_stream={}):", end_stream);
response_->headers().iterate([](const HeaderEntry& header, void*) -> void {
headers->iterate([](const HeaderEntry& header, void*) -> void {
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str());
}, nullptr);
#endif

if (end_stream) {
onComplete();
}
stream_callbacks_.onHeaders(std::move(headers), end_stream);
closeRemote(end_stream);
}

void AsyncRequestImpl::encodeData(Buffer::Instance& data, bool end_stream) {
void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
log_trace("async http request response data (length={} end_stream={})", data.length(),
end_stream);
if (!response_->body()) {
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()});
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
stream_callbacks_.onData(data, end_stream);
closeRemote(end_stream);
}

void AsyncRequestImpl::encodeTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) {
#ifndef NDEBUG
log_debug("async http request response trailers:");
response_->trailers()->iterate([](const HeaderEntry& header, void*) -> void {
trailers->iterate([](const HeaderEntry& header, void*) -> void {
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str());
}, nullptr);
#endif

onComplete();
stream_callbacks_.onTrailers(std::move(trailers));
closeRemote(true);
}

void AsyncRequestImpl::cancel() {
reset_callback_();
cleanup();
void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) {
headers.insertEnvoyInternalRequest().value(Headers::get().EnvoyInternalRequestValues.True);
headers.insertForwardedFor().value(parent_.config_.local_info_.address());
router_.decodeHeaders(headers, end_stream);
closeLocal(end_stream);
}

void AsyncRequestImpl::onComplete() {
complete_ = true;
callbacks_.onSuccess(std::move(response_));
cleanup();
void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
router_.decodeData(data, end_stream);
closeLocal(end_stream);
}

void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) {
router_.decodeTrailers(trailers);
closeLocal(true);
}

void AsyncStreamImpl::closeLocal(bool end_stream) {
ASSERT(!(local_closed_ && end_stream));

local_closed_ |= end_stream;
if (complete()) {
cleanup();
}
}

void AsyncStreamImpl::closeRemote(bool end_stream) {
remote_closed_ |= end_stream;
if (complete()) {
cleanup();
}
}

void AsyncRequestImpl::cleanup() {
response_.reset();
void AsyncStreamImpl::reset() {
reset_callback_();
resetStream();
}

void AsyncStreamImpl::cleanup() {
reset_callback_ = nullptr;

// This will destroy us, but only do so if we are actually in a list. This does not happen in
// the immediate failure case.
if (inserted()) {
removeFromList(parent_.active_requests_);
removeFromList(parent_.active_streams_);
}
}

void AsyncRequestImpl::resetStream() {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
void AsyncStreamImpl::resetStream() {
stream_callbacks_.onReset();
cleanup();
}

void AsyncRequestImpl::failDueToClientDestroy() {
// In this case we are going away because the client is being destroyed. We need to both reset
// the stream as well as raise a failure callback.
reset_callback_();
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
cleanup();
AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: AsyncStreamImpl(parent, *this, timeout), request_(std::move(request)), callbacks_(callbacks) {

sendHeaders(request_->headers(), !request_->body());
if (!complete() && request_->body()) {
sendData(*request_->body(), true);
}
// TODO: Support request trailers.
}

void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); }

void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) {
response_.reset(new ResponseMessageImpl(std::move(headers)));

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) {
if (!response_->body()) {
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()});
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
onComplete();
}

void AsyncRequestImpl::onReset() {
if (!cancelled_) {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
}
}

void AsyncRequestImpl::cancel() {
cancelled_ = true;
reset();
}

} // Http
Loading

0 comments on commit 14d3e97

Please sign in to comment.