Skip to content

Commit

Permalink
grpc: Allow to set parent context to a client to propagate stream info (
Browse files Browse the repository at this point in the history
#13356)

This patch allows to set parent context which carries the current request stream info to a gRPC async client instance.

Risk Level: Low
Testing: Added
Docs Changes: Updated
Release Notes: Added
Fixes #13345

Signed-off-by: Dhi Aurrahman <[email protected]>
  • Loading branch information
dio authored Oct 9, 2020
1 parent 4ff61b8 commit e5aa696
Show file tree
Hide file tree
Showing 28 changed files with 304 additions and 104 deletions.
8 changes: 5 additions & 3 deletions api/envoy/config/core/v3/grpc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,10 @@ message GrpcService {
// request.
google.protobuf.Duration timeout = 3;

// Additional metadata to include in streams initiated to the GrpcService.
// This can be used for scenarios in which additional ad hoc authorization
// headers (e.g. ``x-foo-bar: baz-key``) are to be injected.
// Additional metadata to include in streams initiated to the GrpcService. This can be used for
// scenarios in which additional ad hoc authorization headers (e.g. ``x-foo-bar: baz-key``) are to
// be injected. For more information, including details on header value syntax, see the
// documentation on :ref:`custom request headers
// <config_http_conn_man_headers_custom_request_headers>`.
repeated HeaderValue initial_metadata = 5;
}
8 changes: 5 additions & 3 deletions api/envoy/config/core/v4alpha/grpc_service.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ New Features
* ext_authz filter: added support for letting the authorization server instruct Envoy to remove headers from the original request by setting the new field :ref:`headers_to_remove <envoy_v3_api_field_service.auth.v3.OkHttpResponse.headers_to_remove>` before forwarding it to the upstream.
* ext_authz filter: added support for sending :ref:`raw bytes as request body <envoy_v3_api_field_service.auth.v3.AttributeContext.HttpRequest.raw_body>` of a gRPC check request by setting :ref:`pack_as_bytes <envoy_v3_api_field_extensions.filters.http.ext_authz.v3.BufferSettings.pack_as_bytes>` to true.
* ext_authz_filter: added :ref:`disable_request_body_buffering <envoy_v3_api_field_extensions.filters.http.ext_authz.v3.CheckSettings.disable_request_body_buffering>` to disable request data buffering per-route.
* grpc: implemented header value syntax support when defining :ref:`initial metadata <envoy_v3_api_field_config.core.v3.GrpcService.initial_metadata>` for gRPC-based `ext_authz` :ref:`HTTP <envoy_v3_api_field_extensions.filters.http.ext_authz.v3.ExtAuthz.grpc_service>` and :ref:`network <envoy_v3_api_field_extensions.filters.network.ext_authz.v3.ExtAuthz.grpc_service>` filters, and :ref:`ratelimit <envoy_v3_api_field_config.ratelimit.v3.RateLimitServiceConfig.grpc_service>` filters.
* grpc-json: support specifying `response_body` field in for `google.api.HttpBody` message.
* hds: added :ref:`cluster_endpoints_health <envoy_v3_api_field_service.health.v3.EndpointHealthResponse.cluster_endpoints_health>` to HDS responses, keeping endpoints in the same groupings as they were configured in the HDS specifier by cluster and locality instead of as a flat list.
* hds: added :ref:`transport_socket_matches <envoy_v3_api_field_service.health.v3.ClusterHealthCheck.transport_socket_matches>` to HDS cluster health check specifier, so the existing match filter :ref:`transport_socket_match_criteria <envoy_v3_api_field_config.core.v3.HealthCheck.transport_socket_match_criteria>` in the repeated field :ref:`health_checks <envoy_v3_api_field_service.health.v3.ClusterHealthCheck.health_checks>` has context to match against. This unblocks support for health checks over HTTPS and HTTP/2.
Expand Down
8 changes: 5 additions & 3 deletions generated_api_shadow/envoy/config/core/v3/grpc_service.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/config/route/v3/route_components.pb.h"
#include "envoy/event/dispatcher.h"
#include "envoy/http/message.h"
#include "envoy/stream_info/stream_info.h"
#include "envoy/tracing/http_tracer.h"

#include "common/protobuf/protobuf.h"
Expand Down Expand Up @@ -168,6 +169,13 @@ class AsyncClient {

virtual ~AsyncClient() = default;

/**
* A context from the caller of an async client.
*/
struct ParentContext {
const StreamInfo::StreamInfo* stream_info;
};

/**
* A structure to hold the options for AsyncStream object.
*/
Expand All @@ -193,6 +201,10 @@ class AsyncClient {
hash_policy = v;
return *this;
}
StreamOptions& setParentContext(const ParentContext& v) {
parent_context = v;
return *this;
}

// For gmock test
bool operator==(const StreamOptions& src) const {
Expand All @@ -215,6 +227,9 @@ class AsyncClient {

// Provides the hash policy for hashing load balancing strategies.
Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy> hash_policy;

// Provides parent context. Currently, this holds stream info from the caller.
ParentContext parent_context;
};

/**
Expand Down Expand Up @@ -242,6 +257,10 @@ class AsyncClient {
StreamOptions::setHashPolicy(v);
return *this;
}
RequestOptions& setParentContext(const ParentContext& v) {
StreamOptions::setParentContext(v);
return *this;
}
RequestOptions& setParentSpan(Tracing::Span& parent_span) {
parent_span_ = &parent_span;
return *this;
Expand Down
12 changes: 6 additions & 6 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ AsyncClientImpl::AsyncClientImpl(Upstream::ClusterManager& cm,
const envoy::config::core::v3::GrpcService& config,
TimeSource& time_source)
: cm_(cm), remote_cluster_name_(config.envoy_grpc().cluster_name()),
host_name_(config.envoy_grpc().authority()), initial_metadata_(config.initial_metadata()),
time_source_(time_source) {}
host_name_(config.envoy_grpc().authority()), time_source_(time_source),
metadata_parser_(
Router::HeaderParser::configure(config.initial_metadata(), /*append=*/false)) {}

AsyncClientImpl::~AsyncClientImpl() {
while (!active_streams_.empty()) {
Expand Down Expand Up @@ -88,10 +89,9 @@ void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
parent_.host_name_.empty() ? parent_.remote_cluster_name_ : parent_.host_name_,
service_full_name_, method_name_, options_.timeout);
// Fill service-wide initial metadata.
for (const auto& header_value : parent_.initial_metadata_) {
headers_message_->headers().addCopy(Http::LowerCaseString(header_value.key()),
header_value.value());
}
parent_.metadata_parser_->evaluateHeaders(headers_message_->headers(),
options_.parent_context.stream_info);

callbacks_.onCreateInitialMetadata(headers_message_->headers());
stream_->sendHeaders(headers_message_->headers(), false);
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "common/grpc/codec.h"
#include "common/grpc/typed_async_client.h"
#include "common/http/async_client_impl.h"
#include "common/router/header_parser.h"

namespace Envoy {
namespace Grpc {
Expand Down Expand Up @@ -39,9 +40,9 @@ class AsyncClientImpl final : public RawAsyncClient {
const std::string remote_cluster_name_;
// The host header value in the http transport.
const std::string host_name_;
const Protobuf::RepeatedPtrField<envoy::config::core::v3::HeaderValue> initial_metadata_;
std::list<AsyncStreamImplPtr> active_streams_;
TimeSource& time_source_;
Router::HeaderParserPtr metadata_parser_;

friend class AsyncRequestImpl;
friend class AsyncStreamImpl;
Expand Down
13 changes: 6 additions & 7 deletions source/common/grpc/google_async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "common/grpc/common.h"
#include "common/grpc/google_grpc_creds_impl.h"
#include "common/grpc/google_grpc_utils.h"
#include "common/router/header_parser.h"
#include "common/tracing/http_tracer_impl.h"

#include "grpcpp/support/proto_buffer_reader.h"
Expand Down Expand Up @@ -79,9 +80,11 @@ GoogleAsyncClientImpl::GoogleAsyncClientImpl(Event::Dispatcher& dispatcher,
const envoy::config::core::v3::GrpcService& config,
Api::Api& api, const StatNames& stat_names)
: dispatcher_(dispatcher), tls_(tls), stat_prefix_(config.google_grpc().stat_prefix()),
initial_metadata_(config.initial_metadata()), scope_(scope),
scope_(scope),
per_stream_buffer_limit_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
config.google_grpc(), per_stream_buffer_limit_bytes, DefaultBufferLimitBytes)) {
config.google_grpc(), per_stream_buffer_limit_bytes, DefaultBufferLimitBytes)),
metadata_parser_(
Router::HeaderParser::configure(config.initial_metadata(), /*append=*/false)) {
// We rebuild the channel each time we construct the channel. It appears that the gRPC library is
// smart enough to do connection pooling and reuse with identical channel args, so this should
// have comparable overhead to what we are doing in Grpc::AsyncClientImpl, i.e. no expensive
Expand Down Expand Up @@ -167,12 +170,8 @@ void GoogleAsyncStreamImpl::initialize(bool /*buffer_body_for_retry*/) {
: gpr_inf_future(GPR_CLOCK_REALTIME);
ctxt_.set_deadline(abs_deadline);
// Fill service-wide initial metadata.
for (const auto& header_value : parent_.initial_metadata_) {
ctxt_.AddMetadata(header_value.key(), header_value.value());
}
// Due to the different HTTP header implementations, we effectively double
// copy headers here.
auto initial_metadata = Http::RequestHeaderMapImpl::create();
parent_.metadata_parser_->evaluateHeaders(*initial_metadata, options_.parent_context.stream_info);
callbacks_.onCreateInitialMetadata(*initial_metadata);
initial_metadata->iterate([this](const Http::HeaderEntry& header) {
ctxt_.AddMetadata(std::string(header.key().getStringView()),
Expand Down
3 changes: 2 additions & 1 deletion source/common/grpc/google_async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/grpc/google_grpc_context.h"
#include "common/grpc/stat_names.h"
#include "common/grpc/typed_async_client.h"
#include "common/router/header_parser.h"
#include "common/tracing/http_tracer_impl.h"

#include "absl/container/node_hash_set.h"
Expand Down Expand Up @@ -197,10 +198,10 @@ class GoogleAsyncClientImpl final : public RawAsyncClient, Logger::Loggable<Logg
GoogleStubSharedPtr stub_;
std::list<GoogleAsyncStreamImplPtr> active_streams_;
const std::string stat_prefix_;
const Protobuf::RepeatedPtrField<envoy::config::core::v3::HeaderValue> initial_metadata_;
Stats::ScopeSharedPtr scope_;
GoogleAsyncClientStats stats_;
uint64_t per_stream_buffer_limit_bytes_;
Router::HeaderParserPtr metadata_parser_;

friend class GoogleAsyncClientThreadLocal;
friend class GoogleAsyncRequestImpl;
Expand Down
26 changes: 16 additions & 10 deletions source/common/router/header_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ HeaderParserPtr HeaderParser::configure(
for (const auto& header_value_option : headers_to_add) {
const bool append = PROTOBUF_GET_WRAPPED_OR_DEFAULT(header_value_option, append, true);
HeaderFormatterPtr header_formatter = parseInternal(header_value_option.header(), append);

header_parser->headers_to_add_.emplace_back(
Http::LowerCaseString(header_value_option.header().key()), std::move(header_formatter));
Http::LowerCaseString(header_value_option.header().key()),
HeadersToAddEntry{std::move(header_formatter), header_value_option.header().value()});
}

return header_parser;
Expand All @@ -241,9 +241,9 @@ HeaderParserPtr HeaderParser::configure(

for (const auto& header_value : headers_to_add) {
HeaderFormatterPtr header_formatter = parseInternal(header_value, append);

header_parser->headers_to_add_.emplace_back(Http::LowerCaseString(header_value.key()),
std::move(header_formatter));
header_parser->headers_to_add_.emplace_back(
Http::LowerCaseString(header_value.key()),
HeadersToAddEntry{std::move(header_formatter), header_value.value()});
}

return header_parser;
Expand All @@ -269,19 +269,25 @@ HeaderParserPtr HeaderParser::configure(

void HeaderParser::evaluateHeaders(Http::HeaderMap& headers,
const StreamInfo::StreamInfo& stream_info) const {
evaluateHeaders(headers, &stream_info);
}

void HeaderParser::evaluateHeaders(Http::HeaderMap& headers,
const StreamInfo::StreamInfo* stream_info) const {
// Removing headers in the headers_to_remove_ list first makes
// remove-before-add the default behavior as expected by users.
for (const auto& header : headers_to_remove_) {
headers.remove(header);
}

for (const auto& formatter : headers_to_add_) {
const std::string value = formatter.second->format(stream_info);
for (const auto& [key, entry] : headers_to_add_) {
const std::string value =
stream_info != nullptr ? entry.formatter_->format(*stream_info) : entry.original_value_;
if (!value.empty()) {
if (formatter.second->append()) {
headers.addReferenceKey(formatter.first, value);
if (entry.formatter_->append()) {
headers.addReferenceKey(key, value);
} else {
headers.setReferenceKey(formatter.first, value);
headers.setReferenceKey(key, value);
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion source/common/router/header_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,18 @@ class HeaderParser {
const Protobuf::RepeatedPtrField<std::string>& headers_to_remove);

void evaluateHeaders(Http::HeaderMap& headers, const StreamInfo::StreamInfo& stream_info) const;
void evaluateHeaders(Http::HeaderMap& headers, const StreamInfo::StreamInfo* stream_info) const;

protected:
HeaderParser() = default;

private:
std::vector<std::pair<Http::LowerCaseString, HeaderFormatterPtr>> headers_to_add_;
struct HeadersToAddEntry {
HeaderFormatterPtr formatter_;
const std::string original_value_;
};

std::vector<std::pair<Http::LowerCaseString, HeadersToAddEntry>> headers_to_add_;
std::vector<Http::LowerCaseString> headers_to_remove_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ void GrpcClientImpl::cancel() {

void GrpcClientImpl::check(RequestCallbacks& callbacks, Event::Dispatcher& dispatcher,
const envoy::service::auth::v3::CheckRequest& request,
Tracing::Span& parent_span, const StreamInfo::StreamInfo&) {
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info) {
ASSERT(callbacks_ == nullptr);
callbacks_ = &callbacks;

Http::AsyncClient::RequestOptions options;
if (timeout_.has_value()) {
if (timeoutStartsAtCheckCreation()) {
// TODO(yuval-k): We currently use dispatcher based timeout even if the underlying client is
// google gRPC client, which has it's own timeout mechanism. We may want to change that in
// the future if the implementations converge.
// Google gRPC client, which has its own timeout mechanism. We may want to change that in the
// future if the implementations converge.
timeout_timer_ = dispatcher.createTimer([this]() -> void { onTimeout(); });
timeout_timer_->enableTimer(timeout_.value());
} else {
Expand All @@ -57,6 +57,8 @@ void GrpcClientImpl::check(RequestCallbacks& callbacks, Event::Dispatcher& dispa
}
}

options.setParentContext(Http::AsyncClient::ParentContext{&stream_info});

ENVOY_LOG(trace, "Sending CheckRequest: {}", request.DebugString());
request_ = async_client_->send(service_method_, request, *this, parent_span, options,
transport_api_version_);
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/common/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/ratelimit/ratelimit.h"
#include "envoy/service/ratelimit/v3/rls.pb.h"
#include "envoy/singleton/manager.h"
#include "envoy/stream_info/stream_info.h"
#include "envoy/tracing/http_tracer.h"

#include "absl/types/optional.h"
Expand Down Expand Up @@ -77,7 +78,7 @@ class Client {
*/
virtual void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span) PURE;
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info) PURE;
};

using ClientPtr = std::unique_ptr<Client>;
Expand Down
10 changes: 6 additions & 4 deletions source/extensions/filters/common/ratelimit/ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,18 @@ void GrpcClientImpl::createRequest(envoy::service::ratelimit::v3::RateLimitReque

void GrpcClientImpl::limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span) {
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info) {
ASSERT(callbacks_ == nullptr);
callbacks_ = &callbacks;

envoy::service::ratelimit::v3::RateLimitRequest request;
createRequest(request, domain, descriptors);

request_ = async_client_->send(service_method_, request, *this, parent_span,
Http::AsyncClient::RequestOptions().setTimeout(timeout_),
transport_api_version_);
request_ =
async_client_->send(service_method_, request, *this, parent_span,
Http::AsyncClient::RequestOptions().setTimeout(timeout_).setParentContext(
Http::AsyncClient::ParentContext{&stream_info}),
transport_api_version_);
}

void GrpcClientImpl::onSuccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class GrpcClientImpl : public Client,
void cancel() override;
void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span) override;
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info) override;

// Grpc::AsyncRequestCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/http/ratelimit/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ void Filter::initiateCall(const Http::RequestHeaderMap& headers) {
if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, config_->domain(), descriptors, callbacks_->activeSpan());
client_->limit(*this, config_->domain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo());
initiating_call_ = false;
}
}
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/network/ratelimit/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ Network::FilterStatus Filter::onNewConnection() {
config_->stats().active_.inc();
config_->stats().total_.inc();
calling_limit_ = true;
client_->limit(*this, config_->domain(), config_->descriptors(), Tracing::NullSpan::instance());
client_->limit(*this, config_->domain(), config_->descriptors(), Tracing::NullSpan::instance(),
filter_callbacks_->connection().streamInfo());
calling_limit_ = false;
}

Expand Down
Loading

0 comments on commit e5aa696

Please sign in to comment.