Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasm: Add data buffering for chunks #36411

Merged
merged 6 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ bug_fixes:
- area: tracing
change: |
Fixed a bug where the OpenTelemetry tracer exports the OTLP request even when no spans are present.
- area: wasm
change: |
Fixed a bug where a body received in chunks is not correctly dumped, resulting in
an incomplete dump and loss of the last chunk. This issue is manifested in HTTP/2.

removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
8 changes: 7 additions & 1 deletion source/extensions/common/wasm/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,9 @@ Http::FilterDataStatus Context::decodeData(::Envoy::Buffer::Instance& data, bool
if (!in_vm_context_created_) {
return Http::FilterDataStatus::Continue;
}
if (buffering_request_body_) {
decoder_callbacks_->addDecodedData(data, false);
}
Comment on lines +1726 to +1728
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will make the final output contains repeated data piece?

/wait

Copy link
Contributor Author

@juanmolle juanmolle Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't familiar with that function. and when try to understand where it was used and for what, I found this comment
https://github.com/envoyproxy/envoy/blob/main/source/server/admin/admin_filter.cc#L25
in our case streamming is a supported scenario and that is the reason it is only dumped in buffering scenario

I think I have that scenario covered with the test I have added. the one add at the end of the body '.0'
if other. scenario is not covered let me know I could add a test for that.
I made the suite for http and http/2 because this issue is not happening in http1, it seams the library is always sending the last chunk containing end_of_stream with 0 bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the addDecodedData use the move semantics. So, yeah, then this should be safe. I will take a more detailed look this night.

request_body_buffer_ = &data;
end_of_stream_ = end_stream;
const auto buffer = getBuffer(WasmBufferType::HttpRequestBody);
Expand Down Expand Up @@ -1793,6 +1796,9 @@ Http::FilterDataStatus Context::encodeData(::Envoy::Buffer::Instance& data, bool
if (!in_vm_context_created_) {
return Http::FilterDataStatus::Continue;
}
if (buffering_response_body_) {
encoder_callbacks_->addEncodedData(data, false);
}
response_body_buffer_ = &data;
end_of_stream_ = end_stream;
const auto buffer = getBuffer(WasmBufferType::HttpResponseBody);
Expand All @@ -1801,7 +1807,7 @@ Http::FilterDataStatus Context::encodeData(::Envoy::Buffer::Instance& data, bool
buffering_response_body_ = false;
switch (result) {
case Http::FilterDataStatus::Continue:
request_body_buffer_ = nullptr;
response_body_buffer_ = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this as well.

break;
case Http::FilterDataStatus::StopIterationAndBuffer:
buffering_response_body_ = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ FilterDataStatus BodyContext::onBody(WasmBufferType type, size_t buffer_length,
}
return FilterDataStatus::StopIterationAndBuffer;

} else if (body_op_ == "SetEndOfBodies") {
logBody(type);
if (end_of_stream) {
getBufferStatus(type, &size, &flags);
setBuffer(type, size, 0, ".0");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) nit: for consistency with other operation handling in this plugin, where descriptive text (e.g. "partial.replace") is used as the data that is inserted, you might consider using ".end" or ".appended" instead of ".0".

return FilterDataStatus::Continue;
}
return FilterDataStatus::StopIterationAndBuffer;

} else {
// This is a test and the test was configured incorrectly.
logError("Invalid body test op " + body_op_);
Expand Down
90 changes: 71 additions & 19 deletions test/extensions/filters/http/wasm/wasm_filter_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@ namespace {

class WasmFilterIntegrationTest
: public HttpIntegrationTest,
public testing::TestWithParam<std::tuple<std::string, std::string, bool>> {
public testing::TestWithParam<
std::tuple<std::tuple<std::string, std::string, bool>, Http::CodecType>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: you could define a helper function similar to wasmDualFilterTestMatrix to allow the test to use a flat tuple<std::string, std::string, bool, Http::CodecType> rather than nesting tuples, which gets a little awkward with the nested std::get<>s.

public:
WasmFilterIntegrationTest()
: HttpIntegrationTest(Http::CodecType::HTTP1, Network::Address::IpVersion::v4) {}
: HttpIntegrationTest(std::get<1>(GetParam()), Network::Address::IpVersion::v4) {}

void SetUp() override {
setUpstreamProtocol(Http::CodecType::HTTP1);
setUpstreamProtocol(std::get<1>(GetParam()));
if (std::get<1>(GetParam()) == Http::CodecType::HTTP2) {
config_helper_.setClientCodec(envoy::extensions::filters::network::http_connection_manager::
v3::HttpConnectionManager::HTTP2);
} else {
config_helper_.setClientCodec(envoy::extensions::filters::network::http_connection_manager::
v3::HttpConnectionManager::HTTP1);
}
// Wasm filters are expensive to setup and sometime default is not enough,
// It needs to increase timeout to avoid flaky tests
setListenersBoundTimeout(10 * TestUtility::DefaultTimeout);
Expand All @@ -27,7 +35,7 @@ class WasmFilterIntegrationTest
void TearDown() override { fake_upstream_connection_.reset(); }

void setupWasmFilter(const std::string& config, const std::string& root_id = "") {
bool downstream = std::get<2>(GetParam());
bool downstream = std::get<2>(std::get<0>(GetParam()));
const std::string yaml = TestEnvironment::substitute(absl::StrCat(
R"EOF(
name: envoy.filters.http.wasm
Expand All @@ -40,7 +48,7 @@ class WasmFilterIntegrationTest
vm_config:
vm_id: "vm_id"
runtime: envoy.wasm.runtime.)EOF",
std::get<0>(GetParam()), R"EOF(
std::get<0>(std::get<0>(GetParam())), R"EOF(
configuration:
"@type": type.googleapis.com/google.protobuf.StringValue
value: )EOF",
Expand Down Expand Up @@ -68,11 +76,12 @@ class WasmFilterIntegrationTest
});
}

void runTest(const Http::RequestHeaderMap& request_headers, const std::string& request_body,
void runTest(const Http::RequestHeaderMap& request_headers,
const std::vector<std::string>& request_body,
const Http::RequestHeaderMap& expected_request_headers,
const std::string& expected_request_body,
const Http::ResponseHeaderMap& upstream_response_headers,
const std::string& upstream_response_body,
const std::vector<std::string>& upstream_response_body,
const Http::ResponseHeaderMap& expected_response_headers,
const std::string& expected_response_body) {

Expand All @@ -84,8 +93,11 @@ class WasmFilterIntegrationTest
auto encoder_decoder = codec_client_->startRequest(request_headers);
request_encoder_ = &encoder_decoder.first;
response = std::move(encoder_decoder.second);
Buffer::OwnedImpl buffer(request_body);
codec_client_->sendData(*request_encoder_, buffer, true);
const auto request_body_size = request_body.size();
for (size_t n = 0; n < request_body_size; n++) {
Buffer::OwnedImpl buffer(request_body[n]);
codec_client_->sendData(*request_encoder_, buffer, n == request_body_size - 1);
}
}

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
Expand All @@ -99,8 +111,11 @@ class WasmFilterIntegrationTest
upstream_request_->encodeHeaders(upstream_response_headers, true /*end_stream*/);
} else {
upstream_request_->encodeHeaders(upstream_response_headers, false /*end_stream*/);
Buffer::OwnedImpl buffer(upstream_response_body);
upstream_request_->encodeData(buffer, true);
const auto upstream_response_body_size = upstream_response_body.size();
for (size_t n = 0; n < upstream_response_body_size; n++) {
Buffer::OwnedImpl buffer(upstream_response_body[n]);
upstream_request_->encodeData(buffer, n == upstream_response_body_size - 1);
}
}

ASSERT_TRUE(response->waitForEndStream());
Expand All @@ -114,12 +129,21 @@ class WasmFilterIntegrationTest
ASSERT_TRUE(fake_upstream_connection_->close());
ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect());
}

static std::string
testParamsToString(const ::testing::TestParamInfo<
std::tuple<std::tuple<std::string, std::string, bool>, Http::CodecType>>& p) {
return fmt::format("{}_{}_{}_{}", std::get<2>(std::get<0>(p.param)) ? "downstream" : "upstream",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: if you decide to stick with nested tuples for test params, structured bindings might make this a little easier to read, e.g.

auto [wasm_test_params, codec] = p.param;
auto [runtime, language, direction] = wasm_test_params;
return fmt::format("{}_{}_{}_{}", direction ? "downstream" : "upstream", runtime, language, ...);

std::get<0>(std::get<0>(p.param)), std::get<1>(std::get<0>(p.param)),
std::get<1>(p.param) == Http::CodecType::HTTP2 ? "Http2" : "Http");
}
};

INSTANTIATE_TEST_SUITE_P(
Runtimes, WasmFilterIntegrationTest,
Envoy::Extensions::Common::Wasm::dual_filter_sandbox_runtime_and_cpp_values,
Envoy::Extensions::Common::Wasm::wasmDualFilterTestParamsToString);
INSTANTIATE_TEST_SUITE_P(Runtimes, WasmFilterIntegrationTest,
testing::Combine(Common::Wasm::dual_filter_sandbox_runtime_and_cpp_values,
testing::Values(Http::CodecType::HTTP1,
Http::CodecType::HTTP2)),
WasmFilterIntegrationTest::testParamsToString);
GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(WasmFilterIntegrationTest);

TEST_P(WasmFilterIntegrationTest, HeadersManipulation) {
Expand All @@ -143,8 +167,10 @@ TEST_P(WasmFilterIntegrationTest, HeadersManipulation) {
Http::TestResponseHeaderMapImpl expected_response_headers{
{":status", "200"}, {"content-type", "application/json"}, {"test-status", "OK"}};

runTest(request_headers, "", expected_request_headers, "", upstream_response_headers, "",
expected_response_headers, "");
auto request_body = std::vector<std::string>{};
auto upstream_response_body = std::vector<std::string>{};
runTest(request_headers, request_body, expected_request_headers, "", upstream_response_headers,
upstream_response_body, expected_response_headers, "");
}

TEST_P(WasmFilterIntegrationTest, BodyManipulation) {
Expand All @@ -164,8 +190,34 @@ TEST_P(WasmFilterIntegrationTest, BodyManipulation) {

Http::TestResponseHeaderMapImpl expected_response_headers{{":status", "200"}};

runTest(request_headers, "request_body", expected_request_headers, "replace",
upstream_response_headers, "response_body", expected_response_headers, "replace");
auto request_body = std::vector<std::string>{{"request_body"}};
auto upstream_response_body = std::vector<std::string>{{"upstream_body"}};
runTest(request_headers, request_body, expected_request_headers, "replace",
upstream_response_headers, upstream_response_body, expected_response_headers, "replace");
}

TEST_P(WasmFilterIntegrationTest, BodyBufferedManipulation) {
setupWasmFilter("", "body");
HttpIntegrationTest::initialize();

Http::TestRequestHeaderMapImpl request_headers{{":scheme", "http"},
{":method", "GET"},
{":path", "/"},
{":authority", "host"},
{"x-test-operation", "SetEndOfBodies"}};

Http::TestRequestHeaderMapImpl expected_request_headers{{":path", "/"}};

Http::TestResponseHeaderMapImpl upstream_response_headers{{":status", "200"},
{"x-test-operation", "SetEndOfBodies"}};

Http::TestResponseHeaderMapImpl expected_response_headers{{":status", "200"}};

auto request_body = std::vector<std::string>{{"request_"}, {"body"}};
auto upstream_response_body = std::vector<std::string>{{"upstream_"}, {"body"}};
Comment on lines +205 to +206
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add a test with 3 (or more) pieces?

For the first piece, the addDecodedData won't be called before it haven't enter the buffering mode.
For the second piece, the addDecodeData will be called but it's not the ending, so, the filter chain will still be stoped to handle the empty piece (the piece has been moved by the addDecodedData).
For the third piece, the addDecodedData will be called and it the ending, so the filter chain will continue.

runTest(request_headers, request_body, expected_request_headers, "request_body.0",
upstream_response_headers, upstream_response_body, expected_response_headers,
"upstream_body.0");
}

} // namespace
Expand Down