Skip to content

Commit

Permalink
internal/xds/v3: Improve logging on connection close (#4993)
Browse files Browse the repository at this point in the history
- contour xDS server does not log errors if connection was canceled by
client, detected via gRPC code
- when connection is closed change to debug from info level to avoid log
noise
- add stream open/close logging callbacks for go-control-plane
implementation

Signed-off-by: Sunjay Bhatia <[email protected]>
  • Loading branch information
sunjayBhatia authored Jan 23, 2023
1 parent 33964d6 commit bf8ffcc
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 7 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/4993-sunjayBhatia-small.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve xDS server logging on connection close to be less verbose by default. Previously all closed connections from Envoy xDS resource subscriptions were logged as errors.
26 changes: 26 additions & 0 deletions internal/xds/v3/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package v3

import (
"context"
"fmt"

envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/sirupsen/logrus"
Expand All @@ -27,13 +29,37 @@ import (
// OnStreamRequest is implemented.
func NewRequestLoggingCallbacks(log logrus.FieldLogger) envoy_server_v3.Callbacks {
return &envoy_server_v3.CallbackFuncs{
StreamOpenFunc: func(ctx context.Context, streamID int64, typeURL string) error {
logStreamOpenDetails(log, streamID, typeURL)
return nil
},
StreamClosedFunc: func(streamID int64, node *envoy_config_core_v3.Node) {
logStreamClosedDetails(log, streamID, node)
},
StreamRequestFunc: func(streamID int64, req *envoy_service_discovery_v3.DiscoveryRequest) error {
logDiscoveryRequestDetails(log, req)
return nil
},
}
}

// Helper function for use in the Envoy xDS server callbacks to
// log details of opened streams.
func logStreamOpenDetails(l logrus.FieldLogger, streamID int64, typeURL string) {
l.WithField("type_url", typeURL).WithField("stream_id", streamID).Debug("stream opened")
}

// Helper function for use in the Envoy xDS server callbacks to
// log details of closed streams.
func logStreamClosedDetails(l logrus.FieldLogger, streamID int64, node *envoy_config_core_v3.Node) {
log := l.WithField("stream_id", streamID)
if node != nil {
log = log.WithField("node_id", node.Id)
}

log.Debug("stream closed")
}

// Helper function for use in the Envoy xDS server callbacks and the Contour
// xDS server to log request details. Returns logger with fields added for any
// subsequent error handling and logging.
Expand Down
52 changes: 51 additions & 1 deletion internal/xds/v3/callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package v3

import (
"context"
"fmt"
"testing"

Expand All @@ -27,6 +28,44 @@ import (
"google.golang.org/genproto/googleapis/rpc/status"
)

func TestLogStreamOpenDetails(t *testing.T) {
log, logHook := test.NewNullLogger()
log.SetLevel(logrus.DebugLevel)

logStreamOpenDetails(log, 66, "some-type")
assert.Len(t, logHook.AllEntries(), 1)
entry := logHook.AllEntries()[0]
assert.Equal(t, "stream opened", entry.Message)
assert.Equal(t, logrus.Fields{
"stream_id": int64(66),
"type_url": "some-type",
}, entry.Data)
}

func TestLogStreamClosedDetails(t *testing.T) {
log, logHook := test.NewNullLogger()
log.SetLevel(logrus.DebugLevel)

logStreamClosedDetails(log, 65, nil)
assert.Len(t, logHook.AllEntries(), 1)
entry := logHook.AllEntries()[0]
assert.Equal(t, "stream closed", entry.Message)
assert.Equal(t, logrus.Fields{
"stream_id": int64(65),
}, entry.Data)
logHook.Reset()

logStreamClosedDetails(log, 65, &envoy_config_core_v3.Node{Id: "foo"})
assert.Len(t, logHook.AllEntries(), 1)
entry = logHook.AllEntries()[0]
assert.Equal(t, "stream closed", entry.Message)
assert.Equal(t, logrus.Fields{
"stream_id": int64(65),
"node_id": "foo",
}, entry.Data)
logHook.Reset()
}

func TestLogDiscoveryRequestDetails(t *testing.T) {
log, logHook := test.NewNullLogger()
log.SetLevel(logrus.DebugLevel)
Expand Down Expand Up @@ -140,12 +179,23 @@ func TestOnStreamRequestCallbackLogs(t *testing.T) {
log.SetLevel(logrus.DebugLevel)

callbacks := NewRequestLoggingCallbacks(log)
err := callbacks.OnStreamRequest(999, &envoy_service_discovery_v3.DiscoveryRequest{

err := callbacks.OnStreamOpen(context.TODO(), 999, "a-type")
assert.NoError(t, err)
assert.NotEmpty(t, logHook.AllEntries())
logHook.Reset()

callbacks.OnStreamClosed(999, &envoy_config_core_v3.Node{Id: "envoy-1234"})
assert.NotEmpty(t, logHook.AllEntries())
logHook.Reset()

err = callbacks.OnStreamRequest(999, &envoy_service_discovery_v3.DiscoveryRequest{
VersionInfo: "req-version",
ResponseNonce: "resp-nonce",
ResourceNames: []string{"some", "resources"},
TypeUrl: "some-type-url",
})
assert.NoError(t, err)
assert.NotEmpty(t, logHook.AllEntries())
logHook.Reset()
}
13 changes: 8 additions & 5 deletions internal/xds/v3/contour.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
envoy_service_secret_v3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
"github.com/projectcontour/contour/internal/xds"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
Expand Down Expand Up @@ -77,13 +79,14 @@ func (s *contourServer) stream(st grpcStream) error {

// Notify whether the stream terminated on error.
done := func(log logrus.FieldLogger, err error) error {
if err != nil {
// If the stream has been closed by the client "gracefully",
// do not log as an error.
if err != nil && err != context.Canceled && status.Code(err) != codes.Canceled {
log.WithError(err).Error("stream terminated")
} else {
log.Info("stream terminated")
return err
}

return err
log.Debug("stream terminated")
return nil
}

ch := make(chan int, 1)
Expand Down
57 changes: 56 additions & 1 deletion internal/xds/v3/contour_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package v3

import (
"context"
"errors"
"fmt"
"io"
"testing"
Expand All @@ -23,7 +24,10 @@ import (
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/projectcontour/contour/internal/xds"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/runtime/protoimpl"
)
Expand Down Expand Up @@ -139,7 +143,7 @@ func TestXDSHandlerStream(t *testing.T) {
return io.EOF
},
},
want: context.Canceled,
want: nil,
},
}

Expand All @@ -151,6 +155,57 @@ func TestXDSHandlerStream(t *testing.T) {
}
}

func TestStreamLoggingConnectionClose(t *testing.T) {
log, logHook := test.NewNullLogger()
log.SetLevel(logrus.DebugLevel)

tests := map[string]struct {
closeErr error
wantErr bool
wantLogLevel logrus.Level
}{
"connection closed w/ context.Canceled": {
closeErr: context.Canceled,
wantLogLevel: logrus.DebugLevel,
},
"connection closed w/ rpc error": {
closeErr: status.Error(codes.Canceled, "error canceled"),
wantLogLevel: logrus.DebugLevel,
},
"connection closed w/ some other error": {
closeErr: errors.New("some other error"),
wantLogLevel: logrus.ErrorLevel,
wantErr: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
server := contourServer{FieldLogger: log}
stream := &mockStream{
context: context.Background,
recv: func() (*envoy_service_discovery_v3.DiscoveryRequest, error) {
return nil, tc.closeErr
},
}
err := server.stream(stream)

assert.Len(t, logHook.AllEntries(), 1)
entry := logHook.AllEntries()[0]
assert.Equal(t, "stream terminated", entry.Message)
assert.Equal(t, tc.wantLogLevel, entry.Level)

if tc.wantErr {
assert.Equal(t, tc.closeErr, err)
assert.Equal(t, tc.closeErr, entry.Data["error"])
} else {
assert.Nil(t, err)
}

logHook.Reset()
})
}
}

type mockStream struct {
context func() context.Context
send func(*envoy_service_discovery_v3.DiscoveryResponse) error
Expand Down

0 comments on commit bf8ffcc

Please sign in to comment.