diff --git a/changelogs/unreleased/4993-sunjayBhatia-small.md b/changelogs/unreleased/4993-sunjayBhatia-small.md new file mode 100644 index 00000000000..de6c41a42ae --- /dev/null +++ b/changelogs/unreleased/4993-sunjayBhatia-small.md @@ -0,0 +1 @@ +Improve xDS server logging on connection close to be less verbose by default. Previously all closed connections from Envoy xDS resource subscriptions were logged as errors. diff --git a/internal/xds/v3/callbacks.go b/internal/xds/v3/callbacks.go index aafdc596f04..8ada4603ee3 100644 --- a/internal/xds/v3/callbacks.go +++ b/internal/xds/v3/callbacks.go @@ -14,8 +14,10 @@ package v3 import ( + "context" "fmt" + envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/sirupsen/logrus" @@ -27,6 +29,13 @@ import ( // OnStreamRequest is implemented. func NewRequestLoggingCallbacks(log logrus.FieldLogger) envoy_server_v3.Callbacks { return &envoy_server_v3.CallbackFuncs{ + StreamOpenFunc: func(ctx context.Context, streamID int64, typeURL string) error { + logStreamOpenDetails(log, streamID, typeURL) + return nil + }, + StreamClosedFunc: func(streamID int64, node *envoy_config_core_v3.Node) { + logStreamClosedDetails(log, streamID, node) + }, StreamRequestFunc: func(streamID int64, req *envoy_service_discovery_v3.DiscoveryRequest) error { logDiscoveryRequestDetails(log, req) return nil @@ -34,6 +43,23 @@ func NewRequestLoggingCallbacks(log logrus.FieldLogger) envoy_server_v3.Callback } } +// Helper function for use in the Envoy xDS server callbacks to +// log details of opened streams. +func logStreamOpenDetails(l logrus.FieldLogger, streamID int64, typeURL string) { + l.WithField("type_url", typeURL).WithField("stream_id", streamID).Debug("stream opened") +} + +// Helper function for use in the Envoy xDS server callbacks to +// log details of closed streams. +func logStreamClosedDetails(l logrus.FieldLogger, streamID int64, node *envoy_config_core_v3.Node) { + log := l.WithField("stream_id", streamID) + if node != nil { + log = log.WithField("node_id", node.Id) + } + + log.Debug("stream closed") +} + // Helper function for use in the Envoy xDS server callbacks and the Contour // xDS server to log request details. Returns logger with fields added for any // subsequent error handling and logging. diff --git a/internal/xds/v3/callbacks_test.go b/internal/xds/v3/callbacks_test.go index b6eedb95495..cfc0ccbf1a4 100644 --- a/internal/xds/v3/callbacks_test.go +++ b/internal/xds/v3/callbacks_test.go @@ -14,6 +14,7 @@ package v3 import ( + "context" "fmt" "testing" @@ -27,6 +28,44 @@ import ( "google.golang.org/genproto/googleapis/rpc/status" ) +func TestLogStreamOpenDetails(t *testing.T) { + log, logHook := test.NewNullLogger() + log.SetLevel(logrus.DebugLevel) + + logStreamOpenDetails(log, 66, "some-type") + assert.Len(t, logHook.AllEntries(), 1) + entry := logHook.AllEntries()[0] + assert.Equal(t, "stream opened", entry.Message) + assert.Equal(t, logrus.Fields{ + "stream_id": int64(66), + "type_url": "some-type", + }, entry.Data) +} + +func TestLogStreamClosedDetails(t *testing.T) { + log, logHook := test.NewNullLogger() + log.SetLevel(logrus.DebugLevel) + + logStreamClosedDetails(log, 65, nil) + assert.Len(t, logHook.AllEntries(), 1) + entry := logHook.AllEntries()[0] + assert.Equal(t, "stream closed", entry.Message) + assert.Equal(t, logrus.Fields{ + "stream_id": int64(65), + }, entry.Data) + logHook.Reset() + + logStreamClosedDetails(log, 65, &envoy_config_core_v3.Node{Id: "foo"}) + assert.Len(t, logHook.AllEntries(), 1) + entry = logHook.AllEntries()[0] + assert.Equal(t, "stream closed", entry.Message) + assert.Equal(t, logrus.Fields{ + "stream_id": int64(65), + "node_id": "foo", + }, entry.Data) + logHook.Reset() +} + func TestLogDiscoveryRequestDetails(t *testing.T) { log, logHook := test.NewNullLogger() log.SetLevel(logrus.DebugLevel) @@ -140,7 +179,17 @@ func TestOnStreamRequestCallbackLogs(t *testing.T) { log.SetLevel(logrus.DebugLevel) callbacks := NewRequestLoggingCallbacks(log) - err := callbacks.OnStreamRequest(999, &envoy_service_discovery_v3.DiscoveryRequest{ + + err := callbacks.OnStreamOpen(context.TODO(), 999, "a-type") + assert.NoError(t, err) + assert.NotEmpty(t, logHook.AllEntries()) + logHook.Reset() + + callbacks.OnStreamClosed(999, &envoy_config_core_v3.Node{Id: "envoy-1234"}) + assert.NotEmpty(t, logHook.AllEntries()) + logHook.Reset() + + err = callbacks.OnStreamRequest(999, &envoy_service_discovery_v3.DiscoveryRequest{ VersionInfo: "req-version", ResponseNonce: "resp-nonce", ResourceNames: []string{"some", "resources"}, @@ -148,4 +197,5 @@ func TestOnStreamRequestCallbackLogs(t *testing.T) { }) assert.NoError(t, err) assert.NotEmpty(t, logHook.AllEntries()) + logHook.Reset() } diff --git a/internal/xds/v3/contour.go b/internal/xds/v3/contour.go index d015279d491..12edbadf31d 100644 --- a/internal/xds/v3/contour.go +++ b/internal/xds/v3/contour.go @@ -27,6 +27,8 @@ import ( envoy_service_secret_v3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" "github.com/projectcontour/contour/internal/xds" "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) @@ -77,13 +79,14 @@ func (s *contourServer) stream(st grpcStream) error { // Notify whether the stream terminated on error. done := func(log logrus.FieldLogger, err error) error { - if err != nil { + // If the stream has been closed by the client "gracefully", + // do not log as an error. + if err != nil && err != context.Canceled && status.Code(err) != codes.Canceled { log.WithError(err).Error("stream terminated") - } else { - log.Info("stream terminated") + return err } - - return err + log.Debug("stream terminated") + return nil } ch := make(chan int, 1) diff --git a/internal/xds/v3/contour_test.go b/internal/xds/v3/contour_test.go index e9164faff53..aa1b22d62db 100644 --- a/internal/xds/v3/contour_test.go +++ b/internal/xds/v3/contour_test.go @@ -15,6 +15,7 @@ package v3 import ( "context" + "errors" "fmt" "io" "testing" @@ -23,7 +24,10 @@ import ( envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/projectcontour/contour/internal/xds" "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/runtime/protoimpl" ) @@ -139,7 +143,7 @@ func TestXDSHandlerStream(t *testing.T) { return io.EOF }, }, - want: context.Canceled, + want: nil, }, } @@ -151,6 +155,57 @@ func TestXDSHandlerStream(t *testing.T) { } } +func TestStreamLoggingConnectionClose(t *testing.T) { + log, logHook := test.NewNullLogger() + log.SetLevel(logrus.DebugLevel) + + tests := map[string]struct { + closeErr error + wantErr bool + wantLogLevel logrus.Level + }{ + "connection closed w/ context.Canceled": { + closeErr: context.Canceled, + wantLogLevel: logrus.DebugLevel, + }, + "connection closed w/ rpc error": { + closeErr: status.Error(codes.Canceled, "error canceled"), + wantLogLevel: logrus.DebugLevel, + }, + "connection closed w/ some other error": { + closeErr: errors.New("some other error"), + wantLogLevel: logrus.ErrorLevel, + wantErr: true, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + server := contourServer{FieldLogger: log} + stream := &mockStream{ + context: context.Background, + recv: func() (*envoy_service_discovery_v3.DiscoveryRequest, error) { + return nil, tc.closeErr + }, + } + err := server.stream(stream) + + assert.Len(t, logHook.AllEntries(), 1) + entry := logHook.AllEntries()[0] + assert.Equal(t, "stream terminated", entry.Message) + assert.Equal(t, tc.wantLogLevel, entry.Level) + + if tc.wantErr { + assert.Equal(t, tc.closeErr, err) + assert.Equal(t, tc.closeErr, entry.Data["error"]) + } else { + assert.Nil(t, err) + } + + logHook.Reset() + }) + } +} + type mockStream struct { context func() context.Context send func(*envoy_service_discovery_v3.DiscoveryResponse) error