Skip to content

Commit

Permalink
Ext_proc: Enable sending body without waiting for header response in …
Browse files Browse the repository at this point in the history
…STREAMED mode (envoyproxy#35850)


---------

Signed-off-by: Yanjun Xiang <[email protected]>
  • Loading branch information
yanjunxiang-google authored Sep 25, 2024
1 parent 41a3783 commit 1a15316
Show file tree
Hide file tree
Showing 9 changed files with 621 additions and 74 deletions.
18 changes: 10 additions & 8 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,18 +271,20 @@ message ExternalProcessor {
// The default value is 5000 milliseconds (5 seconds) if not specified.
google.protobuf.Duration deferred_close_timeout = 19;

// [#not-implemented-hide:]
// Send body to the side stream server once it arrives without waiting for the header response from that server.
// It only works for STREAMED body processing mode. For any other body processing modes, it is ignored.
//
// The server has two options upon receiving a header request:
// 1. Instant Response: Send the header response as soon as the header request is received.
// 2. Delayed Response: Wait for the body before sending any response.
// If the server chooses the second option, it has two further choices:
// 2.1 Separate Responses: Send the header response first, followed by separate body responses.
// 2.2 Combined Response: Include both the header response and the first chunk of the body response
// in a single body response message, followed by the remaining body responses.
//
// 1. Instant Response: send the header response as soon as the header request is received.
//
// 2. Delayed Response: wait for the body before sending any response.
//
// In all scenarios, the header-body ordering must always be maintained.
//
// If enabled Envoy will ignore the
// :ref:`mode_override <envoy_v3_api_field_service.ext_proc.v3.ProcessingResponse.mode_override>`
// value that the server sends in the header response. This is because Envoy may have already
// sent the body to the server, prior to processing the header response.
bool send_body_without_waiting_for_header_response = 21;

// When :ref:`allow_mode_override
Expand Down
8 changes: 4 additions & 4 deletions api/envoy/service/ext_proc/v3/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ message ProcessingResponse {
// It is also ignored by Envoy when the ext_proc filter config
// :ref:`allow_mode_override
// <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.allow_mode_override>`
// is set to false.
// is set to false, or
// :ref:`send_body_without_waiting_for_header_response
// <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.send_body_without_waiting_for_header_response>`
// is set to true.
envoy.extensions.filters.http.ext_proc.v3.ProcessingMode mode_override = 9;

// When ext_proc server receives a request message, in case it needs more
Expand Down Expand Up @@ -285,9 +288,6 @@ message CommonResponse {
// Instructions on how to manipulate the headers. When responding to an
// HttpBody request, header mutations will only take effect if
// the current processing mode for the body is BUFFERED.
// [#comment:TODO(yanjunxiang-google) rephrase last sentence once send_body_without_waiting_for_header_response is not hidden:
// the current processing mode for the body is: 1) BUFFERED; 2) or STREAMED and
// the :ref:`send_body_without_waiting_for_header_response <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.send_body_without_waiting_for_header_response>` is enabled.]
HeaderMutation header_mutation = 2;

// Replace the body of the last message sent to the remote server on this
Expand Down
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ minor_behavior_changes:
change: |
When Lua script executes httpCall, backpressure is exercised when receiving body from downstream client. This behavior can be reverted
by setting the runtime guard ``envoy.reloadable_features.lua_flow_control_while_http_call`` to false.
- area: ext_proc
change: |
Added support for :ref:`send_body_without_waiting_for_header_response
<envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.send_body_without_waiting_for_header_response>`.
- area: http
change: |
Modified the authority header value validator to allow the same characters as oghttp2
Expand Down
46 changes: 29 additions & 17 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ FilterConfig::FilterConfig(
deferred_close_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, deferred_close_timeout,
DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS)),
message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms),
send_body_without_waiting_for_header_response_(
config.send_body_without_waiting_for_header_response()),
stats_(generateStats(stats_prefix, config.stat_prefix(), scope)),
processing_mode_(config.processing_mode()),
mutation_checker_(config.mutation_rules(), context.regexEngine()),
Expand Down Expand Up @@ -495,16 +497,22 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
}

if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) {
ENVOY_LOG(trace, "Header processing still in progress -- holding body data");
// We don't know what to do with the body until the response comes back.
// We must buffer it in case we need it when that happens.
// Raise a watermark to prevent a buffer overflow until the response comes back.
// When end_stream is true, we need to StopIterationAndWatermark as well to stop the
// ActiveStream from returning error when the last chunk added to stream buffer exceeds the
// buffer limit.
state.setPaused(true);
state.requestWatermark();
return FilterDataStatus::StopIterationAndWatermark;
if (state.bodyMode() == ProcessingMode::STREAMED &&
config_->sendBodyWithoutWaitingForHeaderResponse()) {
ENVOY_LOG(trace, "Sending body data even header processing is still in progress as body mode "
"is STREAMED and send_body_without_waiting_for_header_response is enabled");
} else {
ENVOY_LOG(trace, "Header processing still in progress -- holding body data");
// We don't know what to do with the body until the response comes back.
// We must buffer it in case we need it when that happens.
// Raise a watermark to prevent a buffer overflow until the response comes back.
// When end_stream is true, we need to StopIterationAndWatermark as well to stop the
// ActiveStream from returning error when the last chunk added to stream buffer exceeds the
// buffer limit.
state.setPaused(true);
state.requestWatermark();
return FilterDataStatus::StopIterationAndWatermark;
}
}

FilterDataStatus result;
Expand Down Expand Up @@ -566,11 +574,13 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
// Need to first enqueue the data into the chunk queue before sending.
auto req = setupBodyChunk(state, data, end_stream);
state.enqueueStreamingChunk(data, end_stream);
sendBodyChunk(state, ProcessorState::CallbackState::StreamedBodyCallback, req);

// At this point we will continue, but with no data, because that will come later
if (end_stream) {
// But we need to stop iteration for the last chunk because it's our last chance to do stuff
// If the current state is HeadersCallback, stays in that state.
if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) {
sendBodyChunk(state, ProcessorState::CallbackState::HeadersCallback, req);
} else {
sendBodyChunk(state, ProcessorState::CallbackState::StreamedBodyCallback, req);
}
if (end_stream || state.callbackState() == ProcessorState::CallbackState::HeadersCallback) {
state.setPaused(true);
result = FilterDataStatus::StopIterationNoBuffer;
} else {
Expand Down Expand Up @@ -1071,9 +1081,11 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
// Update processing mode now because filter callbacks check it
// and the various "handle" methods below may result in callbacks
// being invoked in line. This only happens when filter has allow_mode_override
// set to true and filter is waiting for header processing response.
// set to true, send_body_without_waiting_for_header_response set to false,
// and filter is waiting for header processing response.
// Otherwise, the response mode_override proto field is ignored.
if (config_->allowModeOverride() && inHeaderProcessState() && response->has_mode_override()) {
if (config_->allowModeOverride() && !config_->sendBodyWithoutWaitingForHeaderResponse() &&
inHeaderProcessState() && response->has_mode_override()) {
bool mode_override_allowed = true;
const auto& mode_overide = response->mode_override();
// First, check if mode override allow-list is configured
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ class FilterConfig {

uint32_t maxMessageTimeout() const { return max_message_timeout_ms_; }

bool sendBodyWithoutWaitingForHeaderResponse() const {
return send_body_without_waiting_for_header_response_;
}

const ExtProcFilterStats& stats() const { return stats_; }

const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& processingMode() const {
Expand Down Expand Up @@ -283,6 +287,7 @@ class FilterConfig {
const std::chrono::milliseconds deferred_close_timeout_;
const std::chrono::milliseconds message_timeout_;
const uint32_t max_message_timeout_ms_;
const bool send_body_without_waiting_for_header_response_;

ExtProcFilterStats stats_;
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode processing_mode_;
Expand Down
117 changes: 72 additions & 45 deletions source/extensions/filters/http/ext_proc/processor_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,57 @@ bool ProcessorState::restartMessageTimer(const uint32_t message_timeout_ms) {
}
}

void ProcessorState::sendBufferedDataInStreamedMode(bool end_stream) {
// Process the data being buffered in streaming mode.
// Move the current buffer into the queue for remote processing and clear the buffered data.
if (hasBufferedData()) {
Buffer::OwnedImpl buffered_chunk;
modifyBufferedData([&buffered_chunk](Buffer::Instance& data) { buffered_chunk.move(data); });
ENVOY_LOG(debug, "Sending a chunk of buffered data ({})", buffered_chunk.length());
// Need to first enqueue the data into the chunk queue before sending.
auto req = filter_.setupBodyChunk(*this, buffered_chunk, end_stream);
enqueueStreamingChunk(buffered_chunk, end_stream);
filter_.sendBodyChunk(*this, ProcessorState::CallbackState::StreamedBodyCallback, req);
}
if (queueBelowLowLimit()) {
clearWatermark();
}
}

absl::Status ProcessorState::processHeaderMutation(const CommonResponse& common_response) {
ENVOY_LOG(debug, "Applying header mutations");
const auto mut_status = MutationUtils::applyHeaderMutations(
common_response.header_mutation(), *headers_,
common_response.status() == CommonResponse::CONTINUE_AND_REPLACE,
filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_,
shouldRemoveContentLength());
return mut_status;
}

ProcessorState::CallbackState
ProcessorState::getCallbackStateAfterHeaderResp(const CommonResponse& common_response) const {
if (bodyMode() == ProcessingMode::STREAMED &&
filter_.config().sendBodyWithoutWaitingForHeaderResponse() && !chunk_queue_.empty() &&
(common_response.status() != CommonResponse::CONTINUE_AND_REPLACE)) {
return ProcessorState::CallbackState::StreamedBodyCallback;
}
return ProcessorState::CallbackState::Idle;
}

absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& response) {
if (callback_state_ == CallbackState::HeadersCallback) {
ENVOY_LOG(debug, "applying headers response. body mode = {}",
ProcessingMode::BodySendMode_Name(body_mode_));
const auto& common_response = response.response();
if (common_response.has_header_mutation()) {
const auto mut_status = MutationUtils::applyHeaderMutations(
common_response.header_mutation(), *headers_,
common_response.status() == CommonResponse::CONTINUE_AND_REPLACE,
filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_,
shouldRemoveContentLength());
const auto mut_status = processHeaderMutation(common_response);
if (!mut_status.ok()) {
return mut_status;
}
}

clearRouteCache(common_response);
onFinishProcessorCall(Grpc::Status::Ok);
onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response));

if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) {
ENVOY_LOG(debug, "Replacing complete message");
Expand All @@ -119,6 +152,9 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon
});
}
}

// In case any data left over in the chunk queue, clear them.
clearStreamingChunk();
// Once this message is received, we won't send anything more on this request
// or response to the processor. Clear flags to make sure.
body_mode_ = ProcessingMode::NONE;
Expand All @@ -129,17 +165,26 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon
// Fall through if there was never a body in the first place.
ENVOY_LOG(debug, "The message had no body");
} else if (complete_body_available_ && body_mode_ != ProcessingMode::NONE) {
// If we get here, then all the body data came in before the header message
// was complete, and the server wants the body. It doesn't matter whether the
// processing mode is buffered, streamed, or partially buffered.
if (bufferedData()) {
// Get here, no_body_ = false, and complete_body_available_ = true, the end_stream
// flag of decodeData() can be determined by whether the trailers are received.
// Also, bufferedData() is not nullptr means decodeData() is called, even though
// the data can be an empty chunk.
auto req = filter_.setupBodyChunk(*this, *bufferedData(), !trailers_available_);
filter_.sendBodyChunk(*this, ProcessorState::CallbackState::BufferedBodyCallback, req);
clearWatermark();
if (callback_state_ != CallbackState::StreamedBodyCallback) {
// If we get here, then all the body data came in before the header message
// was complete, and the server wants the body. It doesn't matter whether the
// processing mode is buffered, streamed, or partially buffered.
if (bufferedData()) {
// Get here, no_body_ = false, and complete_body_available_ = true, the end_stream
// flag of decodeData() can be determined by whether the trailers are received.
// Also, bufferedData() is not nullptr means decodeData() is called, even though
// the data can be an empty chunk.
auto req = filter_.setupBodyChunk(*this, *bufferedData(), !trailers_available_);
filter_.sendBodyChunk(*this, ProcessorState::CallbackState::BufferedBodyCallback, req);
clearWatermark();
return absl::OkStatus();
}
} else {
// StreamedBodyCallback state. There is pending body response.
// Check whether there is buffered data. If there is, send them.
// Do not continue filter chain here so the pending body response have chance to be
// served.
sendBufferedDataInStreamedMode(!trailers_available_);
return absl::OkStatus();
}
} else if (body_mode_ == ProcessingMode::BUFFERED) {
Expand All @@ -149,22 +194,7 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon
clearWatermark();
return absl::OkStatus();
} else if (body_mode_ == ProcessingMode::STREAMED) {
if (hasBufferedData()) {
// We now know that we need to process what we have buffered in streaming mode.
// Move the current buffer into the queue for remote processing and clear the
// buffered data.
Buffer::OwnedImpl buffered_chunk;
modifyBufferedData(
[&buffered_chunk](Buffer::Instance& data) { buffered_chunk.move(data); });
ENVOY_LOG(debug, "Sending first chunk using buffered data ({})", buffered_chunk.length());
// Need to first enqueue the data into the chunk queue before sending.
auto req = filter_.setupBodyChunk(*this, buffered_chunk, false);
enqueueStreamingChunk(buffered_chunk, false);
filter_.sendBodyChunk(*this, ProcessorState::CallbackState::StreamedBodyCallback, req);
}
if (queueBelowLowLimit()) {
clearWatermark();
}
sendBufferedDataInStreamedMode(false);
continueIfNecessary();
return absl::OkStatus();
} else if (body_mode_ == ProcessingMode::BUFFERED_PARTIAL) {
Expand Down Expand Up @@ -226,12 +256,7 @@ absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) {
if (callback_state_ == CallbackState::BufferedBodyCallback) {
if (common_response.has_header_mutation()) {
if (headers_ != nullptr) {
ENVOY_LOG(debug, "Applying header mutations to buffered body message");
const auto mut_status = MutationUtils::applyHeaderMutations(
common_response.header_mutation(), *headers_,
common_response.status() == CommonResponse::CONTINUE_AND_REPLACE,
filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_,
shouldRemoveContentLength());
const auto mut_status = processHeaderMutation(common_response);
if (!mut_status.ok()) {
return mut_status;
}
Expand Down Expand Up @@ -291,12 +316,7 @@ absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) {
ENVOY_BUG(chunk != nullptr, "Bad partial body callback state");
if (common_response.has_header_mutation()) {
if (headers_ != nullptr) {
ENVOY_LOG(debug, "Applying header mutations to buffered body message");
const auto mut_status = MutationUtils::applyHeaderMutations(
common_response.header_mutation(), *headers_,
common_response.status() == CommonResponse::CONTINUE_AND_REPLACE,
filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_,
shouldRemoveContentLength());
const auto mut_status = processHeaderMutation(common_response);
if (!mut_status.ok()) {
return mut_status;
}
Expand Down Expand Up @@ -523,6 +543,13 @@ const QueuedChunk& ChunkQueue::consolidate() {
return chunk;
}

void ChunkQueue::clear() {
if (queue_.size() > 1) {
received_data_.drain(received_data_.length());
queue_.clear();
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
7 changes: 7 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ChunkQueue {
uint32_t bytesEnqueued() const { return bytes_enqueued_; }
bool empty() const { return queue_.empty(); }
void push(Buffer::Instance& data, bool end_stream);
void clear();
QueuedChunkPtr pop(Buffer::OwnedImpl& out_data);
const QueuedChunk& consolidate();
Buffer::OwnedImpl& receivedData() { return received_data_; }
Expand Down Expand Up @@ -272,6 +273,12 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {

private:
virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {}
void sendBufferedDataInStreamedMode(bool end_stream);
absl::Status
processHeaderMutation(const envoy::service::ext_proc::v3::CommonResponse& common_response);
void clearStreamingChunk() { chunk_queue_.clear(); }
CallbackState getCallbackStateAfterHeaderResp(
const envoy::service::ext_proc::v3::CommonResponse& common_response) const;
};

class DecodingProcessorState : public ProcessorState {
Expand Down
Loading

0 comments on commit 1a15316

Please sign in to comment.