diff --git a/contrib/golang/common/dso/dso.cc b/contrib/golang/common/dso/dso.cc index ce73364538319..64bcde152fea5 100644 --- a/contrib/golang/common/dso/dso.cc +++ b/contrib/golang/common/dso/dso.cc @@ -59,6 +59,9 @@ HttpFilterDsoImpl::HttpFilterDsoImpl(const std::string dso_name) : HttpFilterDso envoy_go_filter_on_http_data_, handler_, dso_name, "envoyGoFilterOnHttpData"); loaded_ &= dlsymInternal( envoy_go_filter_on_http_log_, handler_, dso_name, "envoyGoFilterOnHttpLog"); + loaded_ &= dlsymInternal( + envoy_go_filter_on_http_stream_complete_, handler_, dso_name, + "envoyGoFilterOnHttpStreamComplete"); loaded_ &= dlsymInternal( envoy_go_filter_on_http_destroy_, handler_, dso_name, "envoyGoFilterOnHttpDestroy"); loaded_ &= dlsymInternal( @@ -100,6 +103,11 @@ void HttpFilterDsoImpl::envoyGoFilterOnHttpLog(httpRequest* p0, int p1) { envoy_go_filter_on_http_log_(p0, GoUint64(p1)); } +void HttpFilterDsoImpl::envoyGoFilterOnHttpStreamComplete(httpRequest* p0) { + ASSERT(envoy_go_filter_on_http_stream_complete_ != nullptr); + envoy_go_filter_on_http_stream_complete_(p0); +} + void HttpFilterDsoImpl::envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) { ASSERT(envoy_go_filter_on_http_destroy_ != nullptr); envoy_go_filter_on_http_destroy_(p0, GoUint64(p1)); diff --git a/contrib/golang/common/dso/dso.h b/contrib/golang/common/dso/dso.h index 78c7c0d768ef8..eb30864d183ce 100644 --- a/contrib/golang/common/dso/dso.h +++ b/contrib/golang/common/dso/dso.h @@ -44,6 +44,7 @@ class HttpFilterDso : public Dso { virtual GoUint64 envoyGoFilterOnHttpData(processState* p0, GoUint64 p1, GoUint64 p2, GoUint64 p3) PURE; virtual void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) PURE; + virtual void envoyGoFilterOnHttpStreamComplete(httpRequest* p0) PURE; virtual void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) PURE; virtual void envoyGoRequestSemaDec(httpRequest* p0) PURE; }; @@ -62,6 +63,7 @@ class HttpFilterDsoImpl : public HttpFilterDso { GoUint64 envoyGoFilterOnHttpData(processState* p0, GoUint64 p1, GoUint64 p2, GoUint64 p3) override; void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) override; + void envoyGoFilterOnHttpStreamComplete(httpRequest* p0) override; void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) override; void envoyGoRequestSemaDec(httpRequest* p0) override; void cleanup() override; @@ -76,6 +78,7 @@ class HttpFilterDsoImpl : public HttpFilterDso { GoUint64 (*envoy_go_filter_on_http_data_)(processState* p0, GoUint64 p1, GoUint64 p2, GoUint64 p3) = {nullptr}; void (*envoy_go_filter_on_http_log_)(httpRequest* p0, GoUint64 p1) = {nullptr}; + void (*envoy_go_filter_on_http_stream_complete_)(httpRequest* p0) = {nullptr}; void (*envoy_go_filter_on_http_destroy_)(httpRequest* p0, GoUint64 p1) = {nullptr}; void (*envoy_go_filter_go_request_sema_dec_)(httpRequest* p0) = {nullptr}; void (*envoy_go_filter_cleanup_)() = {nullptr}; diff --git a/contrib/golang/common/dso/libgolang.h b/contrib/golang/common/dso/libgolang.h index b477b5b55e6bd..a35e14840509c 100644 --- a/contrib/golang/common/dso/libgolang.h +++ b/contrib/golang/common/dso/libgolang.h @@ -122,6 +122,10 @@ extern GoUint64 envoyGoFilterOnHttpData(processState* s, GoUint64 end_stream, Go // github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpLog extern void envoyGoFilterOnHttpLog(httpRequest* r, GoUint64 type); +// go:linkname envoyGoFilterOnHttpStreamComplete +// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpStreamComplete +extern void envoyGoFilterOnHttpStreamComplete(httpRequest* r); + // go:linkname envoyGoFilterOnHttpDestroy // github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpDestroy extern void envoyGoFilterOnHttpDestroy(httpRequest* r, GoUint64 reason); diff --git a/contrib/golang/common/dso/test/mocks.h b/contrib/golang/common/dso/test/mocks.h index 973a096bfb10b..29372717ef912 100644 --- a/contrib/golang/common/dso/test/mocks.h +++ b/contrib/golang/common/dso/test/mocks.h @@ -22,6 +22,7 @@ class MockHttpFilterDsoImpl : public HttpFilterDso { MOCK_METHOD(GoUint64, envoyGoFilterOnHttpData, (processState * p0, GoUint64 p1, GoUint64 p2, GoUint64 p3)); MOCK_METHOD(void, envoyGoFilterOnHttpLog, (httpRequest * p0, int p1)); + MOCK_METHOD(void, envoyGoFilterOnHttpStreamComplete, (httpRequest * p0)); MOCK_METHOD(void, envoyGoFilterOnHttpDestroy, (httpRequest * p0, int p1)); MOCK_METHOD(void, envoyGoRequestSemaDec, (httpRequest * p0)); MOCK_METHOD(void, envoyGoFilterCleanUp, ()); diff --git a/contrib/golang/common/dso/test/test_data/simple.go b/contrib/golang/common/dso/test/test_data/simple.go index a0ce2419bb2fe..1651776ddb05e 100644 --- a/contrib/golang/common/dso/test/test_data/simple.go +++ b/contrib/golang/common/dso/test/test_data/simple.go @@ -60,6 +60,10 @@ func envoyGoFilterOnHttpData(s *C.processState, endStream, buffer, length uint64 func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) { } +//export envoyGoFilterOnHttpStreamComplete +func envoyGoFilterOnHttpStreamComplete(r *C.httpRequest) { +} + //export envoyGoFilterOnHttpDestroy func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) { } diff --git a/contrib/golang/common/go/api/filter.go b/contrib/golang/common/go/api/filter.go index f33e1e90ae719..0c3a7dd8a1696 100644 --- a/contrib/golang/common/go/api/filter.go +++ b/contrib/golang/common/go/api/filter.go @@ -87,7 +87,7 @@ type StreamFilter interface { // destroy filter OnDestroy(DestroyReason) - // TODO add more for stream complete + OnStreamComplete() } func (*PassThroughStreamFilter) OnLog() { @@ -102,6 +102,9 @@ func (*PassThroughStreamFilter) OnLogDownstreamPeriodic() { func (*PassThroughStreamFilter) OnDestroy(DestroyReason) { } +func (*PassThroughStreamFilter) OnStreamComplete() { +} + type StreamFilterConfigParser interface { // Parse the proto message to any Go value, and return error to reject the config. // This is called when Envoy receives the config from the control plane. diff --git a/contrib/golang/filters/http/source/go/pkg/http/shim.go b/contrib/golang/filters/http/source/go/pkg/http/shim.go index c904ee5f98a2b..3836e0b531905 100644 --- a/contrib/golang/filters/http/source/go/pkg/http/shim.go +++ b/contrib/golang/filters/http/source/go/pkg/http/shim.go @@ -289,6 +289,15 @@ func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) { } } +//export envoyGoFilterOnHttpStreamComplete +func envoyGoFilterOnHttpStreamComplete(r *C.httpRequest) { + req := getRequest(r) + defer req.recoverPanic() + + f := req.httpFilter + f.OnStreamComplete() +} + //export envoyGoFilterOnHttpDestroy func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) { req := getRequest(r) diff --git a/contrib/golang/filters/http/source/golang_filter.cc b/contrib/golang/filters/http/source/golang_filter.cc index 486d16da589d5..276c6238c7549 100644 --- a/contrib/golang/filters/http/source/golang_filter.cc +++ b/contrib/golang/filters/http/source/golang_filter.cc @@ -129,6 +129,13 @@ Http::FilterTrailersStatus Filter::encodeTrailers(Http::ResponseTrailerMap& trai return done ? Http::FilterTrailersStatus::Continue : Http::FilterTrailersStatus::StopIteration; } +void Filter::onStreamComplete() { + // This only runs in the work thread, it's safe even without lock. + is_golang_processing_stream_complete_ = true; + dynamic_lib_->envoyGoFilterOnHttpStreamComplete(req_); + is_golang_processing_stream_complete_ = false; +} + void Filter::onDestroy() { ENVOY_LOG(debug, "golang filter on destroy"); @@ -170,7 +177,7 @@ void Filter::log(const Formatter::HttpFormatterContext& log_context, const_cast(&log_context.requestHeaders())); } - // This only run in the work thread, it's safe even without lock. + // This only runs in the work thread, it's safe even without lock. is_golang_processing_log_ = true; dynamic_lib_->envoyGoFilterOnHttpLog(req_, int(log_context.accessLogType())); is_golang_processing_log_ = false; diff --git a/contrib/golang/filters/http/source/golang_filter.h b/contrib/golang/filters/http/source/golang_filter.h index f3a9a461c7e60..e07de8095cc3f 100644 --- a/contrib/golang/filters/http/source/golang_filter.h +++ b/contrib/golang/filters/http/source/golang_filter.h @@ -223,6 +223,7 @@ class Filter : public Http::StreamFilter, } // Http::StreamFilterBase + void onStreamComplete() override; void onDestroy() ABSL_LOCKS_EXCLUDED(mutex_) override; Http::LocalErrorStatus onLocalReply(const LocalReplyData&) override; @@ -255,8 +256,6 @@ class Filter : public Http::StreamFilter, void log(const Formatter::HttpFormatterContext& log_context, const StreamInfo::StreamInfo& info) override; - void onStreamComplete() override {} - CAPIStatus clearRouteCache(); CAPIStatus continueStatus(ProcessorState& state, GolangStatus status); @@ -293,8 +292,8 @@ class Filter : public Http::StreamFilter, GoInt32* rc); bool isProcessingInGo() { - return is_golang_processing_log_ || decoding_state_.isProcessingInGo() || - encoding_state_.isProcessingInGo(); + return is_golang_processing_stream_complete_ || is_golang_processing_log_ || + decoding_state_.isProcessingInGo() || encoding_state_.isProcessingInGo(); } void deferredDeleteRequest(HttpRequestInternal* req); @@ -361,6 +360,7 @@ class Filter : public Http::StreamFilter, Thread::MutexBasicLockable mutex_{}; bool has_destroyed_ ABSL_GUARDED_BY(mutex_){false}; + bool is_golang_processing_stream_complete_{false}; bool is_golang_processing_log_{false}; }; diff --git a/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc b/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc index c34b4cc1accb2..7bc2d138a07ae 100644 --- a/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc +++ b/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc @@ -56,6 +56,8 @@ DEFINE_PROTO_FUZZER(const envoy::extensions::filters::http::golang::GolangFilter .WillByDefault(Return(static_cast(GolangStatus::Continue))); ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpLog(_, _)) .WillByDefault(Invoke([&](httpRequest*, int) -> void {})); + ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpStreamComplete(_)) + .WillByDefault(Invoke([&](httpRequest*) -> void {})); ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpDestroy(_, _)) .WillByDefault(Invoke([&](httpRequest* p0, int) -> void { // delete the filter->req_, make LeakSanitizer happy. diff --git a/contrib/golang/filters/http/test/golang_integration_test.cc b/contrib/golang/filters/http/test/golang_integration_test.cc index cddf317e93680..4ffe516897435 100644 --- a/contrib/golang/filters/http/test/golang_integration_test.cc +++ b/contrib/golang/filters/http/test/golang_integration_test.cc @@ -11,6 +11,8 @@ namespace Envoy { +using testing::HasSubstr; + // helper function absl::string_view getHeader(const Http::HeaderMap& headers, absl::string_view key) { auto values = headers.get(Http::LowerCaseString(key)); @@ -888,6 +890,7 @@ TEST_P(GolangIntegrationTest, Property) { } TEST_P(GolangIntegrationTest, AccessLog) { + useAccessLog("%DYNAMIC_METADATA(golang:access_log_var)%"); initializeBasicFilter(ACCESSLOG, "test.com"); auto path = "/test"; @@ -918,6 +921,9 @@ TEST_P(GolangIntegrationTest, AccessLog) { ASSERT_TRUE(response->waitForEndStream()); codec_client_->close(); + std::string log = waitForAccessLog(access_log_name_); + EXPECT_THAT(log, HasSubstr("access_log_var written by Golang filter")); + // use the second request to get the logged data codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); response = sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0); diff --git a/contrib/golang/filters/http/test/test_data/access_log/filter.go b/contrib/golang/filters/http/test/test_data/access_log/filter.go index 8b7e8233f6309..3dc4d7d156582 100644 --- a/contrib/golang/filters/http/test/test_data/access_log/filter.go +++ b/contrib/golang/filters/http/test/test_data/access_log/filter.go @@ -100,6 +100,10 @@ func (f *filter) OnLogDownstreamPeriodic() { }() } +func (f *filter) OnStreamComplete() { + f.callbacks.StreamInfo().DynamicMetadata().Set("golang", "access_log_var", "access_log_var written by Golang filter") +} + func (f *filter) OnLog() { code, ok := f.callbacks.StreamInfo().ResponseCode() if !ok {