Skip to content

Commit

Permalink
golang: add OnStreamComplete callback to mutate final metadata
Browse files Browse the repository at this point in the history
As discussed from https:
//github.com/envoyproxy/pull/35595#issuecomment-2278413889,
the `AccessLogHandler::log` is not designed for mutating the
StreamInfo. It's recommended to use `OnStreamComplete` to do the
final metadata management.

Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander committed Aug 19, 2024
1 parent ccae683 commit e558920
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 6 deletions.
8 changes: 8 additions & 0 deletions contrib/golang/common/dso/dso.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ HttpFilterDsoImpl::HttpFilterDsoImpl(const std::string dso_name) : HttpFilterDso
envoy_go_filter_on_http_data_, handler_, dso_name, "envoyGoFilterOnHttpData");
loaded_ &= dlsymInternal<decltype(envoy_go_filter_on_http_log_)>(
envoy_go_filter_on_http_log_, handler_, dso_name, "envoyGoFilterOnHttpLog");
loaded_ &= dlsymInternal<decltype(envoy_go_filter_on_http_stream_complete_)>(
envoy_go_filter_on_http_stream_complete_, handler_, dso_name,
"envoyGoFilterOnHttpStreamComplete");
loaded_ &= dlsymInternal<decltype(envoy_go_filter_on_http_destroy_)>(
envoy_go_filter_on_http_destroy_, handler_, dso_name, "envoyGoFilterOnHttpDestroy");
loaded_ &= dlsymInternal<decltype(envoy_go_filter_go_request_sema_dec_)>(
Expand Down Expand Up @@ -100,6 +103,11 @@ void HttpFilterDsoImpl::envoyGoFilterOnHttpLog(httpRequest* p0, int p1) {
envoy_go_filter_on_http_log_(p0, GoUint64(p1));
}

void HttpFilterDsoImpl::envoyGoFilterOnHttpStreamComplete(httpRequest* p0) {
ASSERT(envoy_go_filter_on_http_stream_complete_ != nullptr);
envoy_go_filter_on_http_stream_complete_(p0);
}

void HttpFilterDsoImpl::envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) {
ASSERT(envoy_go_filter_on_http_destroy_ != nullptr);
envoy_go_filter_on_http_destroy_(p0, GoUint64(p1));
Expand Down
3 changes: 3 additions & 0 deletions contrib/golang/common/dso/dso.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class HttpFilterDso : public Dso {
virtual GoUint64 envoyGoFilterOnHttpData(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) PURE;
virtual void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) PURE;
virtual void envoyGoFilterOnHttpStreamComplete(httpRequest* p0) PURE;
virtual void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) PURE;
virtual void envoyGoRequestSemaDec(httpRequest* p0) PURE;
};
Expand All @@ -62,6 +63,7 @@ class HttpFilterDsoImpl : public HttpFilterDso {
GoUint64 envoyGoFilterOnHttpData(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) override;
void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) override;
void envoyGoFilterOnHttpStreamComplete(httpRequest* p0) override;
void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) override;
void envoyGoRequestSemaDec(httpRequest* p0) override;
void cleanup() override;
Expand All @@ -76,6 +78,7 @@ class HttpFilterDsoImpl : public HttpFilterDso {
GoUint64 (*envoy_go_filter_on_http_data_)(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) = {nullptr};
void (*envoy_go_filter_on_http_log_)(httpRequest* p0, GoUint64 p1) = {nullptr};
void (*envoy_go_filter_on_http_stream_complete_)(httpRequest* p0) = {nullptr};
void (*envoy_go_filter_on_http_destroy_)(httpRequest* p0, GoUint64 p1) = {nullptr};
void (*envoy_go_filter_go_request_sema_dec_)(httpRequest* p0) = {nullptr};
void (*envoy_go_filter_cleanup_)() = {nullptr};
Expand Down
4 changes: 4 additions & 0 deletions contrib/golang/common/dso/libgolang.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ extern GoUint64 envoyGoFilterOnHttpData(processState* s, GoUint64 end_stream, Go
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpLog
extern void envoyGoFilterOnHttpLog(httpRequest* r, GoUint64 type);

// go:linkname envoyGoFilterOnHttpStreamComplete
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpStreamComplete
extern void envoyGoFilterOnHttpStreamComplete(httpRequest* r);

// go:linkname envoyGoFilterOnHttpDestroy
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpDestroy
extern void envoyGoFilterOnHttpDestroy(httpRequest* r, GoUint64 reason);
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/common/dso/test/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class MockHttpFilterDsoImpl : public HttpFilterDso {
MOCK_METHOD(GoUint64, envoyGoFilterOnHttpData,
(processState * p0, GoUint64 p1, GoUint64 p2, GoUint64 p3));
MOCK_METHOD(void, envoyGoFilterOnHttpLog, (httpRequest * p0, int p1));
MOCK_METHOD(void, envoyGoFilterOnHttpStreamComplete, (httpRequest * p0));
MOCK_METHOD(void, envoyGoFilterOnHttpDestroy, (httpRequest * p0, int p1));
MOCK_METHOD(void, envoyGoRequestSemaDec, (httpRequest * p0));
MOCK_METHOD(void, envoyGoFilterCleanUp, ());
Expand Down
4 changes: 4 additions & 0 deletions contrib/golang/common/dso/test/test_data/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func envoyGoFilterOnHttpData(s *C.processState, endStream, buffer, length uint64
func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) {
}

//export envoyGoFilterOnHttpStreamComplete
func envoyGoFilterOnHttpStreamComplete(r *C.httpRequest) {
}

//export envoyGoFilterOnHttpDestroy
func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) {
}
Expand Down
5 changes: 4 additions & 1 deletion contrib/golang/common/go/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type StreamFilter interface {

// destroy filter
OnDestroy(DestroyReason)
// TODO add more for stream complete
OnStreamComplete()
}

func (*PassThroughStreamFilter) OnLog() {
Expand All @@ -102,6 +102,9 @@ func (*PassThroughStreamFilter) OnLogDownstreamPeriodic() {
func (*PassThroughStreamFilter) OnDestroy(DestroyReason) {
}

func (*PassThroughStreamFilter) OnStreamComplete() {
}

type StreamFilterConfigParser interface {
// Parse the proto message to any Go value, and return error to reject the config.
// This is called when Envoy receives the config from the control plane.
Expand Down
9 changes: 9 additions & 0 deletions contrib/golang/filters/http/source/go/pkg/http/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) {
}
}

//export envoyGoFilterOnHttpStreamComplete
func envoyGoFilterOnHttpStreamComplete(r *C.httpRequest) {
req := getRequest(r)
defer req.recoverPanic()

f := req.httpFilter
f.OnStreamComplete()
}

//export envoyGoFilterOnHttpDestroy
func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) {
req := getRequest(r)
Expand Down
9 changes: 8 additions & 1 deletion contrib/golang/filters/http/source/golang_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ Http::FilterTrailersStatus Filter::encodeTrailers(Http::ResponseTrailerMap& trai
return done ? Http::FilterTrailersStatus::Continue : Http::FilterTrailersStatus::StopIteration;
}

void Filter::onStreamComplete() {
// This only runs in the work thread, it's safe even without lock.
is_golang_processing_stream_complete_ = true;
dynamic_lib_->envoyGoFilterOnHttpStreamComplete(req_);
is_golang_processing_stream_complete_ = false;
}

void Filter::onDestroy() {
ENVOY_LOG(debug, "golang filter on destroy");

Expand Down Expand Up @@ -170,7 +177,7 @@ void Filter::log(const Formatter::HttpFormatterContext& log_context,
const_cast<Http::RequestHeaderMap*>(&log_context.requestHeaders()));
}

// This only run in the work thread, it's safe even without lock.
// This only runs in the work thread, it's safe even without lock.
is_golang_processing_log_ = true;
dynamic_lib_->envoyGoFilterOnHttpLog(req_, int(log_context.accessLogType()));
is_golang_processing_log_ = false;
Expand Down
8 changes: 4 additions & 4 deletions contrib/golang/filters/http/source/golang_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class Filter : public Http::StreamFilter,
}

// Http::StreamFilterBase
void onStreamComplete() override;
void onDestroy() ABSL_LOCKS_EXCLUDED(mutex_) override;
Http::LocalErrorStatus onLocalReply(const LocalReplyData&) override;

Expand Down Expand Up @@ -255,8 +256,6 @@ class Filter : public Http::StreamFilter,
void log(const Formatter::HttpFormatterContext& log_context,
const StreamInfo::StreamInfo& info) override;

void onStreamComplete() override {}

CAPIStatus clearRouteCache();
CAPIStatus continueStatus(ProcessorState& state, GolangStatus status);

Expand Down Expand Up @@ -293,8 +292,8 @@ class Filter : public Http::StreamFilter,
GoInt32* rc);

bool isProcessingInGo() {
return is_golang_processing_log_ || decoding_state_.isProcessingInGo() ||
encoding_state_.isProcessingInGo();
return is_golang_processing_stream_complete_ || is_golang_processing_log_ ||
decoding_state_.isProcessingInGo() || encoding_state_.isProcessingInGo();
}
void deferredDeleteRequest(HttpRequestInternal* req);

Expand Down Expand Up @@ -361,6 +360,7 @@ class Filter : public Http::StreamFilter,
Thread::MutexBasicLockable mutex_{};
bool has_destroyed_ ABSL_GUARDED_BY(mutex_){false};

bool is_golang_processing_stream_complete_{false};
bool is_golang_processing_log_{false};
};

Expand Down
2 changes: 2 additions & 0 deletions contrib/golang/filters/http/test/golang_filter_fuzz_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ DEFINE_PROTO_FUZZER(const envoy::extensions::filters::http::golang::GolangFilter
.WillByDefault(Return(static_cast<uint64_t>(GolangStatus::Continue)));
ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpLog(_, _))
.WillByDefault(Invoke([&](httpRequest*, int) -> void {}));
ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpStreamComplete(_))
.WillByDefault(Invoke([&](httpRequest*) -> void {}));
ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpDestroy(_, _))
.WillByDefault(Invoke([&](httpRequest* p0, int) -> void {
// delete the filter->req_, make LeakSanitizer happy.
Expand Down
6 changes: 6 additions & 0 deletions contrib/golang/filters/http/test/golang_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

namespace Envoy {

using testing::HasSubstr;

// helper function
absl::string_view getHeader(const Http::HeaderMap& headers, absl::string_view key) {
auto values = headers.get(Http::LowerCaseString(key));
Expand Down Expand Up @@ -888,6 +890,7 @@ TEST_P(GolangIntegrationTest, Property) {
}

TEST_P(GolangIntegrationTest, AccessLog) {
useAccessLog("%DYNAMIC_METADATA(golang:access_log_var)%");
initializeBasicFilter(ACCESSLOG, "test.com");

auto path = "/test";
Expand Down Expand Up @@ -918,6 +921,9 @@ TEST_P(GolangIntegrationTest, AccessLog) {
ASSERT_TRUE(response->waitForEndStream());
codec_client_->close();

std::string log = waitForAccessLog(access_log_name_);
EXPECT_THAT(log, HasSubstr("access_log_var written by Golang filter"));

// use the second request to get the logged data
codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http"))));
response = sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (f *filter) OnLogDownstreamPeriodic() {
}()
}

func (f *filter) OnStreamComplete() {
f.callbacks.StreamInfo().DynamicMetadata().Set("golang", "access_log_var", "access_log_var written by Golang filter")
}

func (f *filter) OnLog() {
code, ok := f.callbacks.StreamInfo().ResponseCode()
if !ok {
Expand Down

0 comments on commit e558920

Please sign in to comment.