Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

golang: add OnStreamComplete callback to mutate final metadata #35742

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions contrib/golang/common/dso/dso.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ HttpFilterDsoImpl::HttpFilterDsoImpl(const std::string dso_name) : HttpFilterDso
envoy_go_filter_on_http_data_, handler_, dso_name, "envoyGoFilterOnHttpData");
loaded_ &= dlsymInternal<decltype(envoy_go_filter_on_http_log_)>(
envoy_go_filter_on_http_log_, handler_, dso_name, "envoyGoFilterOnHttpLog");
loaded_ &= dlsymInternal<decltype(envoy_go_filter_on_http_stream_complete_)>(
envoy_go_filter_on_http_stream_complete_, handler_, dso_name,
"envoyGoFilterOnHttpStreamComplete");
loaded_ &= dlsymInternal<decltype(envoy_go_filter_on_http_destroy_)>(
envoy_go_filter_on_http_destroy_, handler_, dso_name, "envoyGoFilterOnHttpDestroy");
loaded_ &= dlsymInternal<decltype(envoy_go_filter_go_request_sema_dec_)>(
Expand Down Expand Up @@ -103,6 +106,11 @@ void HttpFilterDsoImpl::envoyGoFilterOnHttpLog(httpRequest* p0, int p1, processS
envoy_go_filter_on_http_log_(p0, GoUint64(p1), p2, p3, p4, p5, p6, p7, p8, p9, p10, p11);
}

void HttpFilterDsoImpl::envoyGoFilterOnHttpStreamComplete(httpRequest* p0) {
ASSERT(envoy_go_filter_on_http_stream_complete_ != nullptr);
envoy_go_filter_on_http_stream_complete_(p0);
}

void HttpFilterDsoImpl::envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) {
ASSERT(envoy_go_filter_on_http_destroy_ != nullptr);
envoy_go_filter_on_http_destroy_(p0, GoUint64(p1));
Expand Down
3 changes: 3 additions & 0 deletions contrib/golang/common/dso/dso.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class HttpFilterDso : public Dso {
virtual void envoyGoFilterOnHttpLog(httpRequest* p0, int p1, processState* p2, processState* p3,
GoUint64 p4, GoUint64 p5, GoUint64 p6, GoUint64 p7,
GoUint64 p8, GoUint64 p9, GoUint64 p10, GoUint64 p11) PURE;
virtual void envoyGoFilterOnHttpStreamComplete(httpRequest* p0) PURE;
virtual void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) PURE;
virtual void envoyGoRequestSemaDec(httpRequest* p0) PURE;
};
Expand All @@ -66,6 +67,7 @@ class HttpFilterDsoImpl : public HttpFilterDso {
void envoyGoFilterOnHttpLog(httpRequest* p0, int p1, processState* p2, processState* p3,
GoUint64 p4, GoUint64 p5, GoUint64 p6, GoUint64 p7, GoUint64 p8,
GoUint64 p9, GoUint64 p10, GoUint64 p11) override;
void envoyGoFilterOnHttpStreamComplete(httpRequest* p0) override;
void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) override;
void envoyGoRequestSemaDec(httpRequest* p0) override;
void cleanup() override;
Expand All @@ -83,6 +85,7 @@ class HttpFilterDsoImpl : public HttpFilterDso {
GoUint64 p4, GoUint64 p5, GoUint64 p6, GoUint64 p7,
GoUint64 p8, GoUint64 p9, GoUint64 p10,
GoUint64 p11) = {nullptr};
void (*envoy_go_filter_on_http_stream_complete_)(httpRequest* p0) = {nullptr};
void (*envoy_go_filter_on_http_destroy_)(httpRequest* p0, GoUint64 p1) = {nullptr};
void (*envoy_go_filter_go_request_sema_dec_)(httpRequest* p0) = {nullptr};
void (*envoy_go_filter_cleanup_)() = {nullptr};
Expand Down
4 changes: 4 additions & 0 deletions contrib/golang/common/dso/libgolang.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ extern void envoyGoFilterOnHttpLog(httpRequest* r, GoUint64 type, processState*
GoUint64 resp_header_bytes, GoUint64 resp_trailer_num,
GoUint64 resp_trailer_bytes);

// go:linkname envoyGoFilterOnHttpStreamComplete
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpStreamComplete
extern void envoyGoFilterOnHttpStreamComplete(httpRequest* r);

// go:linkname envoyGoFilterOnHttpDestroy
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpDestroy
extern void envoyGoFilterOnHttpDestroy(httpRequest* r, GoUint64 reason);
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/common/dso/test/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class MockHttpFilterDsoImpl : public HttpFilterDso {
(httpRequest * p0, int p1, processState* p2, processState* p3, GoUint64 p4,
GoUint64 p5, GoUint64 p6, GoUint64 p7, GoUint64 p8, GoUint64 p9, GoUint64 p10,
GoUint64 p11));
MOCK_METHOD(void, envoyGoFilterOnHttpStreamComplete, (httpRequest * p0));
MOCK_METHOD(void, envoyGoFilterOnHttpDestroy, (httpRequest * p0, int p1));
MOCK_METHOD(void, envoyGoRequestSemaDec, (httpRequest * p0));
MOCK_METHOD(void, envoyGoFilterCleanUp, ());
Expand Down
4 changes: 4 additions & 0 deletions contrib/golang/common/dso/test/test_data/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64, decodingState *C.p
respHeaderNum, respHeaderBytes, respTrailerNum, respTrailerBytes uint64) {
}

//export envoyGoFilterOnHttpStreamComplete
func envoyGoFilterOnHttpStreamComplete(r *C.httpRequest) {
}

//export envoyGoFilterOnHttpDestroy
func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) {
}
Expand Down
5 changes: 4 additions & 1 deletion contrib/golang/common/go/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type StreamFilter interface {

// destroy filter
OnDestroy(DestroyReason)
// TODO add more for stream complete
OnStreamComplete()
}

func (*PassThroughStreamFilter) OnLog(RequestHeaderMap, RequestTrailerMap, ResponseHeaderMap, ResponseTrailerMap) {
Expand All @@ -102,6 +102,9 @@ func (*PassThroughStreamFilter) OnLogDownstreamPeriodic(RequestHeaderMap, Reques
func (*PassThroughStreamFilter) OnDestroy(DestroyReason) {
}

func (*PassThroughStreamFilter) OnStreamComplete() {
}

type StreamFilterConfigParser interface {
// Parse the proto message to any Go value, and return error to reject the config.
// This is called when Envoy receives the config from the control plane.
Expand Down
9 changes: 9 additions & 0 deletions contrib/golang/filters/http/source/go/pkg/http/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,15 @@ func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64,
}
}

//export envoyGoFilterOnHttpStreamComplete
func envoyGoFilterOnHttpStreamComplete(r *C.httpRequest) {
req := getRequest(r)
defer req.recoverPanic()

f := req.httpFilter
f.OnStreamComplete()
}

//export envoyGoFilterOnHttpDestroy
func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) {
req := getRequest(r)
Expand Down
8 changes: 8 additions & 0 deletions contrib/golang/filters/http/source/golang_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ Http::FilterTrailersStatus Filter::encodeTrailers(Http::ResponseTrailerMap& trai
return done ? Http::FilterTrailersStatus::Continue : Http::FilterTrailersStatus::StopIteration;
}

void Filter::onStreamComplete() {
// We reuse the same flag for both onStreamComplete & log to save the space,
// since they are exclusive and serve for the access log purpose.
req_->is_golang_processing_log = 1;
dynamic_lib_->envoyGoFilterOnHttpStreamComplete(req_);
req_->is_golang_processing_log = 0;
}

void Filter::onDestroy() {
ENVOY_LOG(debug, "golang filter on destroy");

Expand Down
3 changes: 1 addition & 2 deletions contrib/golang/filters/http/source/golang_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class Filter : public Http::StreamFilter,
}

// Http::StreamFilterBase
void onStreamComplete() override;
void onDestroy() ABSL_LOCKS_EXCLUDED(mutex_) override;
Http::LocalErrorStatus onLocalReply(const LocalReplyData&) override;

Expand Down Expand Up @@ -255,8 +256,6 @@ class Filter : public Http::StreamFilter,
void log(const Formatter::HttpFormatterContext& log_context,
const StreamInfo::StreamInfo& info) override;

void onStreamComplete() override {}

CAPIStatus clearRouteCache();
CAPIStatus continueStatus(ProcessorState& state, GolangStatus status);

Expand Down
2 changes: 2 additions & 0 deletions contrib/golang/filters/http/test/golang_filter_fuzz_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ DEFINE_PROTO_FUZZER(const envoy::extensions::filters::http::golang::GolangFilter
.WillByDefault(
Invoke([&](httpRequest*, int, processState*, processState*, GoUint64, GoUint64, GoUint64,
GoUint64, GoUint64, GoUint64, GoUint64, GoUint64) -> void {}));
ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpStreamComplete(_))
.WillByDefault(Invoke([&](httpRequest*) -> void {}));
ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpDestroy(_, _))
.WillByDefault(Invoke([&](httpRequest* p0, int) -> void {
// delete the filter->req_, make LeakSanitizer happy.
Expand Down
6 changes: 6 additions & 0 deletions contrib/golang/filters/http/test/golang_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

namespace Envoy {

using testing::HasSubstr;

// helper function
absl::string_view getHeader(const Http::HeaderMap& headers, absl::string_view key) {
auto values = headers.get(Http::LowerCaseString(key));
Expand Down Expand Up @@ -888,6 +890,7 @@ TEST_P(GolangIntegrationTest, Property) {
}

TEST_P(GolangIntegrationTest, AccessLog) {
useAccessLog("%DYNAMIC_METADATA(golang:access_log_var)%");
initializeBasicFilter(ACCESSLOG, "test.com");

auto path = "/test";
Expand Down Expand Up @@ -924,6 +927,9 @@ TEST_P(GolangIntegrationTest, AccessLog) {
ASSERT_TRUE(response->waitForEndStream());
codec_client_->close();

std::string log = waitForAccessLog(access_log_name_);
EXPECT_THAT(log, HasSubstr("access_log_var written by Golang filter"));

// use the second request to get the logged data
codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http"))));
response = sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (f *filter) OnLogDownstreamPeriodic(reqHeader api.RequestHeaderMap, reqTrai
}()
}

func (f *filter) OnStreamComplete() {
f.callbacks.StreamInfo().DynamicMetadata().Set("golang", "access_log_var", "access_log_var written by Golang filter")
}

func (f *filter) OnLog(reqHeader api.RequestHeaderMap, reqTrailer api.RequestTrailerMap, respHeader api.ResponseHeaderMap, respTrailer api.ResponseTrailerMap) {
referer, err := f.callbacks.GetProperty("request.referer")
if err != nil {
Expand Down