Skip to content

Commit

Permalink
Merge pull request kubernetes-csi#1 from pohly/connection-tests
Browse files Browse the repository at this point in the history
connection: add tests
  • Loading branch information
jsafrane authored Feb 4, 2019
2 parents 635b0f9 + fad9258 commit 23d286e
Show file tree
Hide file tree
Showing 8 changed files with 2,653 additions and 6 deletions.
9 changes: 7 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 81 additions & 4 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package connection

import (
"context"
"errors"
"net"
"strings"
"time"

Expand All @@ -34,19 +36,94 @@ const (
// Connect opens insecure gRPC connection to a CSI driver. Address must be either absolute path to UNIX domain socket
// file or have format '<protocol>://', following gRPC name resolution mechanism at
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
//
// The function tries to connect indefinitely every second until it connects. The function automatically disables TLS
// and adds interceptor for logging of all gRPC messages at level 5.
func Connect(address string) (*grpc.ClientConn, error) {
dialOptions := []grpc.DialOption{
//
// For a connection to a Unix Domain socket, the behavior after
// loosing the connection is configurable. The default is to
// log the connection loss and reestablish a connection. Applications
// which need to know about a connection loss can be notified by
// passing a callback with OnConnectionLoss and in that callback
// can decide what to do:
// - exit the application with os.Exit
// - invalidate cached information
// - disable the reconnect, which will cause all gRPC method calls to fail with status.Unavailable
//
// For other connections, the default behavior from gRPC is used and
// loss of connection is not detected reliably.
func Connect(address string, options ...Option) (*grpc.ClientConn, error) {
return connect(address, []grpc.DialOption{}, options)
}

// Option is the type of all optional parameters for Connect.
type Option func(o *options)

// OnConnectionLoss registers a callback that will be invoked when the
// connection got lost. If that callback returns true, the connection
// is restablished. Otherwise the connection is left as it is and
// all future gRPC calls using it will fail with status.Unavailable.
func OnConnectionLoss(reconnect func() bool) Option {
return func(o *options) {
o.reconnect = reconnect
}
}

type options struct {
reconnect func() bool
}

// connect is the internal implementation of Connect. It has more options to enable testing.
func connect(address string, dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) {
var o options
for _, option := range connectOptions {
option(&o)
}

dialOptions = append(dialOptions,
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
grpc.WithBlock(), // Block until connection succeeds.
grpc.WithUnaryInterceptor(LogGRPC), // Log all messages.
}
)
unixPrefix := "unix://"
if strings.HasPrefix(address, "/") {
// It looks like filesystem path.
address = "unix://" + address
address = unixPrefix + address
}

if strings.HasPrefix(address, unixPrefix) {
// state variables for the custom dialer
haveConnected := false
lostConnection := false
reconnect := true

dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
if haveConnected && !lostConnection {
// We have detected a loss of connection for the first time. Decide what to do...
// Record this once. TODO (?): log at regular time intervals.
glog.Errorf("Lost connection to %s.", address)
// Inform caller and let it decide? Default is to reconnect.
if o.reconnect != nil {
reconnect = o.reconnect()
}
lostConnection = true
}
if !reconnect {
return nil, errors.New("connection lost, reconnecting disabled")
}
conn, err := net.DialTimeout("unix", address[len(unixPrefix):], timeout)
if err == nil {
// Connection restablished.
haveConnected = true
lostConnection = false
}
return conn, err
}))
} else if o.reconnect != nil {
return nil, errors.New("OnConnectionLoss callback only supported for unix:// addresses")
}

glog.Infof("Connecting to %s", address)

// Connect in background.
Expand Down
Loading

0 comments on commit 23d286e

Please sign in to comment.