Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support structured logging #149

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ all:
go build `go list ./... | grep -v 'vendor'`

include release-tools/build.make

# Check contextual logging.
.PHONY: logcheck
test: logcheck
logcheck:
hack/verify-logcheck.sh
68 changes: 43 additions & 25 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -73,21 +75,21 @@ func SetMaxGRPCLogLength(characterCount int) {
//
// For other connections, the default behavior from gRPC is used and
// loss of connection is not detected reliably.
func Connect(address string, metricsManager metrics.CSIMetricsManager, options ...Option) (*grpc.ClientConn, error) {
func Connect(ctx context.Context, address string, metricsManager metrics.CSIMetricsManager, options ...Option) (*grpc.ClientConn, error) {
// Prepend default options
options = append([]Option{WithTimeout(time.Second * 30)}, options...)
if metricsManager != nil {
options = append([]Option{WithMetrics(metricsManager)}, options...)
}
return connect(address, options)
return connect(ctx, address, options)
}

// ConnectWithoutMetrics behaves exactly like Connect except no metrics are recorded.
// This function is deprecated, prefer using Connect with `nil` as the metricsManager.
func ConnectWithoutMetrics(address string, options ...Option) (*grpc.ClientConn, error) {
func ConnectWithoutMetrics(ctx context.Context, address string, options ...Option) (*grpc.ClientConn, error) {
// Prepend default options
options = append([]Option{WithTimeout(time.Second * 30)}, options...)
return connect(address, options)
return connect(ctx, address, options)
}

// Option is the type of all optional parameters for Connect.
Expand All @@ -97,27 +99,33 @@ type Option func(o *options)
// connection got lost. If that callback returns true, the connection
// is reestablished. Otherwise the connection is left as it is and
// all future gRPC calls using it will fail with status.Unavailable.
func OnConnectionLoss(reconnect func() bool) Option {
func OnConnectionLoss(reconnect func(context.Context) bool) Option {
return func(o *options) {
o.reconnect = reconnect
}
}

// ExitOnConnectionLoss returns callback for OnConnectionLoss() that writes
// an error to /dev/termination-log and exits.
func ExitOnConnectionLoss() func() bool {
return func() bool {
func ExitOnConnectionLoss() func(context.Context) bool {
pohly marked this conversation as resolved.
Show resolved Hide resolved
return func(ctx context.Context) bool {
terminationMsg := "Lost connection to CSI driver, exiting"
if err := os.WriteFile(terminationLogPath, []byte(terminationMsg), 0644); err != nil {
klog.Errorf("%s: %s", terminationLogPath, err)
klog.FromContext(ctx).Error(err, "Failed to write a message to the termination logfile", "terminationLogPath", terminationLogPath)
}
klog.Exit(terminationMsg)
klog.FromContext(ctx).Error(nil, terminationMsg)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
// Not reached.
return false
}
}

// WithTimeout adds a configurable timeout on the gRPC calls.
// Note that this timeout also prevents all attempts to reconnect
// because it uses context.WithTimeout internally.
//
// For more details, see https://github.com/grpc/grpc-go/issues/133
// and https://github.com/kubernetes-csi/csi-lib-utils/pull/149#discussion_r1574707477
func WithTimeout(timeout time.Duration) Option {
return func(o *options) {
o.timeout = timeout
Expand All @@ -139,30 +147,36 @@ func WithOtelTracing() Option {
}

type options struct {
reconnect func() bool
reconnect func(context.Context) bool
timeout time.Duration
metricsManager metrics.CSIMetricsManager
enableOtelTracing bool
}

// connect is the internal implementation of Connect. It has more options to enable testing.
func connect(
ctx context.Context,
pohly marked this conversation as resolved.
Show resolved Hide resolved
address string,
connectOptions []Option) (*grpc.ClientConn, error) {
logger := klog.FromContext(ctx)
var o options
for _, option := range connectOptions {
option(&o)
}

bc := backoff.DefaultConfig
bc.MaxDelay = time.Second
dialOptions := []grpc.DialOption{
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
grpc.WithTransportCredentials(insecure.NewCredentials()), // Don't use TLS, it's usually local Unix domain socket in a container.
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bc}), // Retry every second after failure.
grpc.WithBlock(), // Block until connection succeeds.
grpc.WithIdleTimeout(time.Duration(0)), // Never close connection because of inactivity.
}

if o.timeout > 0 {
dialOptions = append(dialOptions, grpc.WithTimeout(o.timeout))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, o.timeout)
defer cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to myself: this works because connect waits for dialing to finish or fail. Otherwise returning would cause an on-going dial attempt to be aborted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one problem that was already in the original code: as described in grpc/grpc-go#133, WithTimeout also prevents all future attempts to reconnect. That might be surprising when someone wants to use Connect with a OnConnectionLoss callback that doesn't abort.

Can you add a comment to WithTimeout explaining that setting a timeout prevent reconnecting?

Copy link
Contributor Author

@bells17 bells17 Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant adding the comment to func WithTimeout(...). Then it becomes part of the API documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pohly Sorry for my misunderstanding. I have removed the comment.

Regarding this:

I meant adding the comment to func WithTimeout(...). Then it becomes part of the API documentation.

I'm not fully understanding the implication, so I would like to clarify the meaning.
Is this suggesting that a comment should be added to the following page?
https://pkg.go.dev/context#WithTimeout

If my understanding above is correct, please let me know if there's anything else I should do to clarify or address this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I meant adding it to

// WithTimeout adds a configurable timeout on the gRPC calls.
func WithTimeout(timeout time.Duration) Option {
return func(o *options) {
o.timeout = timeout
}
}

}

interceptors := []grpc.UnaryClientInterceptor{LogGRPC}
Expand All @@ -186,20 +200,25 @@ func connect(
lostConnection := false
reconnect := true

dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
dialOptions = append(dialOptions, grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
logger := klog.FromContext(ctx)
if haveConnected && !lostConnection {
// We have detected a loss of connection for the first time. Decide what to do...
// Record this once. TODO (?): log at regular time intervals.
klog.Errorf("Lost connection to %s.", address)
logger.Error(nil, "Lost connection", "address", address)
pohly marked this conversation as resolved.
Show resolved Hide resolved
// Inform caller and let it decide? Default is to reconnect.
if o.reconnect != nil {
reconnect = o.reconnect()
reconnect = o.reconnect(ctx)
}
lostConnection = true
}
if !reconnect {
return nil, errors.New("connection lost, reconnecting disabled")
}
var timeout time.Duration
if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline)
}
pohly marked this conversation as resolved.
Show resolved Hide resolved
conn, err := net.DialTimeout("unix", address[len(unixPrefix):], timeout)
if err == nil {
// Connection reestablished.
Expand All @@ -212,14 +231,14 @@ func connect(
return nil, errors.New("OnConnectionLoss callback only supported for unix:// addresses")
}

klog.V(5).Infof("Connecting to %s", address)
logger.V(5).Info("Connecting", "address", address)

// Connect in background.
var conn *grpc.ClientConn
var err error
ready := make(chan bool)
go func() {
conn, err = grpc.Dial(address, dialOptions...)
conn, err = grpc.DialContext(ctx, address, dialOptions...)
close(ready)
}()

Expand All @@ -231,7 +250,7 @@ func connect(
for {
select {
case <-ticker.C:
klog.Warningf("Still connecting to %s", address)
logger.Info("Still connecting", "address", address)

case <-ready:
return conn, err
Expand All @@ -241,15 +260,14 @@ func connect(

// LogGRPC is gPRC unary interceptor for logging of CSI messages at level 5. It removes any secrets from the message.
func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
klog.V(5).Infof("GRPC call: %s", method)
klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
logger := klog.FromContext(ctx)
pohly marked this conversation as resolved.
Show resolved Hide resolved
logger.V(5).Info("GRPC call", "method", method, "request", protosanitizer.StripSecrets(req))
err := invoker(ctx, method, req, reply, cc, opts...)
cappedStr := protosanitizer.StripSecrets(reply).String()
if maxLogChar > 0 && len(cappedStr) > maxLogChar {
cappedStr = cappedStr[:maxLogChar] + fmt.Sprintf(" [response body too large, log capped to %d chars]", maxLogChar)
}
klog.V(5).Infof("GRPC response: %s", cappedStr)
klog.V(5).Infof("GRPC error: %v", err)
logger.V(5).Info("GRPC response", "response", cappedStr, "err", err)
return err
}

Expand Down Expand Up @@ -286,14 +304,14 @@ func (cmm ExtendedCSIMetricsManager) RecordMetricsClientInterceptor(
if additionalInfo != nil {
additionalInfoVal, ok := additionalInfo.(AdditionalInfo)
if !ok {
klog.Errorf("Failed to record migrated status, cannot convert additional info %v", additionalInfo)
klog.FromContext(ctx).Error(nil, "Failed to record migrated status, cannot convert additional info", "additionalInfo", additionalInfo)
return err
}
migrated = additionalInfoVal.Migrated
}
cmmv, metricsErr := cmm.WithLabelValues(map[string]string{metrics.LabelMigrated: migrated})
if metricsErr != nil {
klog.Errorf("Failed to record migrated status, error: %v", metricsErr)
klog.FromContext(ctx).Error(metricsErr, "Failed to record migrated status")
} else {
cmmBase = cmmv
}
Expand Down
Loading