Skip to content

Commit

Permalink
Ext proc http functionality support (#35740)
Browse files Browse the repository at this point in the history
Risk Level: low
Testing: n/a
Docs Changes: n/a
Release Notes: inline
Fixes:

Description:
This is to address the issue:
#35488, i.e, integrate the
ext_proc HTTP client to ext_proc filter. With this PR, the basic
functionalities to have Envoy ext_proc filter talk to a HTTP server
using HTTP messages are accomplished.

This is the follow up of PR:
#35676

---------

Signed-off-by: Yanjun Xiang <[email protected]>
  • Loading branch information
yanjunxiang-google authored Sep 30, 2024
1 parent 8772930 commit a3e32c9
Show file tree
Hide file tree
Showing 25 changed files with 1,077 additions and 134 deletions.
1 change: 1 addition & 0 deletions api/envoy/extensions/filters/http/ext_proc/v3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ api_proto_package(
"//envoy/config/core/v3:pkg",
"//envoy/type/matcher/v3:pkg",
"@com_github_cncf_xds//udpa/annotations:pkg",
"@com_github_cncf_xds//xds/annotations/v3:pkg",
],
)
37 changes: 35 additions & 2 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import "envoy/type/matcher/v3/string.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";

import "xds/annotations/v3/status.proto";

import "udpa/annotations/migrate.proto";
import "udpa/annotations/status.proto";
import "validate/validate.proto";
Expand Down Expand Up @@ -131,8 +133,39 @@ message ExternalProcessor {
// Only one of ``http_service`` or
// :ref:`grpc_service <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.grpc_service>`.
// can be set. It is required that one of them must be set.
ExtProcHttpService http_service = 20
[(udpa.annotations.field_migrate).oneof_promotion = "ext_proc_service_type"];
//
// If ``http_service`` is set, the
// :ref:`processing_mode <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.processing_mode>`
// can not be configured to send any body or trailers. i.e, http_service only supports
// sending request or response headers to the side stream server.
//
// With this configuration, Envoy behavior:
//
// 1. The headers are first put in a proto message
// :ref:`ProcessingRequest <envoy_v3_api_msg_service.ext_proc.v3.ProcessingRequest>`.
//
// 2. This proto message is then transcoded into a JSON text.
//
// 3. Envoy then sends a HTTP POST message with content-type as "application/json",
// and this JSON text as body to the side stream server.
//
// After the side-stream receives this HTTP request message, it is expected to do as follows:
//
// 1. It converts the body, which is a JSON string, into a ``ProcessingRequest``
// proto message to examine and mutate the headers.
//
// 2. It then sets the mutated headers into a new proto message
// :ref:`ProcessingResponse <envoy_v3_api_msg_service.ext_proc.v3.ProcessingResponse>`.
//
// 3. It converts ``ProcessingResponse`` proto message into a JSON text.
//
// 4. It then sends a HTTP response back to Envoy with status code as "200",
// content-type as "application/json" and sets the JSON text as the body.
//
ExtProcHttpService http_service = 20 [
(udpa.annotations.field_migrate).oneof_promotion = "ext_proc_service_type",
(xds.annotations.v3.field_status).work_in_progress = true
];

// By default, if the gRPC stream cannot be established, or if it is closed
// prematurely with an error, the filter will fail. Specifically, if the
Expand Down
13 changes: 13 additions & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_library(
name = "client_base_interface",
hdrs = ["client_base.h"],
tags = ["skip_on_windows"],
deps = [
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "ext_proc",
srcs = [
Expand All @@ -21,6 +30,7 @@ envoy_cc_library(
],
tags = ["skip_on_windows"],
deps = [
":client_base_interface",
":client_interface",
":matching_utils_lib",
":mutation_utils_lib",
Expand All @@ -34,6 +44,7 @@ envoy_cc_library(
"//source/common/runtime:runtime_features_lib",
"//source/extensions/filters/common/mutation_rules:mutation_rules_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//source/extensions/filters/http/ext_proc/http_client:http_client_lib",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/strings:string_view",
Expand All @@ -53,6 +64,7 @@ envoy_cc_extension(
":client_lib",
":ext_proc",
"//source/extensions/filters/http/common:factory_base_lib",
"//source/extensions/filters/http/ext_proc/http_client:http_client_lib",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
],
)
Expand All @@ -62,6 +74,7 @@ envoy_cc_library(
hdrs = ["client.h"],
tags = ["skip_on_windows"],
deps = [
":client_base_interface",
"//envoy/grpc:async_client_manager_interface",
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
Expand Down
9 changes: 4 additions & 5 deletions source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
#include "envoy/stream_info/stream_info.h"

#include "source/common/http/sidestream_watermark.h"
#include "source/extensions/filters/http/ext_proc/client_base.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

class ExternalProcessorStream {
class ExternalProcessorStream : public StreamBase {
public:
virtual ~ExternalProcessorStream() = default;
virtual void send(envoy::service::ext_proc::v3::ProcessingRequest&& request,
Expand All @@ -30,7 +31,7 @@ class ExternalProcessorStream {

using ExternalProcessorStreamPtr = std::unique_ptr<ExternalProcessorStream>;

class ExternalProcessorCallbacks {
class ExternalProcessorCallbacks : public RequestCallbacks {
public:
virtual ~ExternalProcessorCallbacks() = default;
virtual void onReceiveMessage(
Expand All @@ -40,16 +41,14 @@ class ExternalProcessorCallbacks {
virtual void logGrpcStreamInfo() PURE;
};

class ExternalProcessorClient {
class ExternalProcessorClient : public ClientBase {
public:
virtual ~ExternalProcessorClient() = default;
virtual ExternalProcessorStreamPtr
start(ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE;
virtual ExternalProcessorStream* stream() PURE;
virtual void setStream(ExternalProcessorStream* stream) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
47 changes: 47 additions & 0 deletions source/extensions/filters/http/ext_proc/client_base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include <memory>

#include "envoy/service/ext_proc/v3/external_processor.pb.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

/**
* Async callbacks used during external processing.
*/
class RequestCallbacks {
public:
virtual ~RequestCallbacks() = default;
virtual void onComplete(envoy::service::ext_proc::v3::ProcessingResponse& response) PURE;
virtual void onError() PURE;
};

/**
* Stream base class used during external processing.
*/
class StreamBase {
public:
virtual ~StreamBase() = default;
};

/**
* Async client base class used during external processing.
*/
class ClientBase {
public:
virtual ~ClientBase() = default;
virtual void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& request,
bool end_stream, const uint64_t stream_id, RequestCallbacks* callbacks,
StreamBase* stream) PURE;
virtual void cancel() PURE;
};

using ClientBasePtr = std::unique_ptr<ClientBase>;

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
7 changes: 7 additions & 0 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ ExternalProcessorStreamPtr ExternalProcessorClientImpl::start(
sidestream_watermark_callbacks);
}

void ExternalProcessorClientImpl::sendRequest(
envoy::service::ext_proc::v3::ProcessingRequest&& request, bool end_stream, const uint64_t,
RequestCallbacks*, StreamBase* stream) {
ExternalProcessorStream* grpc_stream = dynamic_cast<ExternalProcessorStream*>(stream);
grpc_stream->send(std::move(request), end_stream);
}

ExternalProcessorStreamPtr ExternalProcessorStreamImpl::create(
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options,
Expand Down
9 changes: 4 additions & 5 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override;
ExternalProcessorStream* stream() override { return stream_; }
void setStream(ExternalProcessorStream* stream) override { stream_ = stream; }
void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& request, bool end_stream,
const uint64_t stream_id, RequestCallbacks* callbacks,
StreamBase* stream) override;
void cancel() override {}

private:
Grpc::AsyncClientManager& client_manager_;
Stats::Scope& scope_;
// The gRPC stream to the external processor, which will be opened
// when it's time to send the first message.
ExternalProcessorStream* stream_ = nullptr;
};

class ExternalProcessorStreamImpl : public ExternalProcessorStream,
Expand Down
50 changes: 33 additions & 17 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/http/ext_proc/client_impl.h"
#include "source/extensions/filters/http/ext_proc/ext_proc.h"
#include "source/extensions/filters/http/ext_proc/http_client/http_client_impl.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -22,15 +23,22 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
dual_info.scope, stats_prefix, dual_info.is_upstream,
Envoy::Extensions::Filters::Common::Expr::getBuilder(context), context);

return [filter_config = std::move(filter_config), grpc_service = proto_config.grpc_service(),
&context, dual_info](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), dual_info.scope);

callbacks.addStreamFilter(Http::StreamFilterSharedPtr{
std::make_shared<Filter>(filter_config, std::move(client), grpc_service)});
};
if (proto_config.has_grpc_service()) {
return [filter_config = std::move(filter_config), &context,
dual_info](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), dual_info.scope);
callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
} else {
return [proto_config = std::move(proto_config), filter_config = std::move(filter_config),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExtProcHttpClient>(proto_config, context);
callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
}
}

Router::RouteSpecificFilterConfigConstSharedPtr
Expand All @@ -54,14 +62,22 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp
server_context.scope(), stats_prefix, false,
Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context), server_context);

return [filter_config = std::move(filter_config), grpc_service = proto_config.grpc_service(),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
server_context.clusterManager().grpcAsyncClientManager(), server_context.scope());

callbacks.addStreamFilter(Http::StreamFilterSharedPtr{
std::make_shared<Filter>(filter_config, std::move(client), grpc_service)});
};
if (proto_config.has_grpc_service()) {
return [filter_config = std::move(filter_config),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
server_context.clusterManager().grpcAsyncClientManager(), server_context.scope());
callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
} else {
return [proto_config = std::move(proto_config), filter_config = std::move(filter_config),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExtProcHttpClient>(proto_config, server_context);
callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
}
}

LEGACY_REGISTER_FACTORY(ExternalProcessingFilterConfig,
Expand Down
Loading

0 comments on commit a3e32c9

Please sign in to comment.