Skip to content

Commit

Permalink
node driver registrar should use CSI connection lib
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsykim committed Feb 8, 2019
1 parent c6e7e83 commit cc08995
Show file tree
Hide file tree
Showing 8 changed files with 429 additions and 54 deletions.
12 changes: 8 additions & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

[[constraint]]
name = "github.com/kubernetes-csi/csi-lib-utils"
version = "0.1.0"
version = "0.3.0"

[prune]
non-go = true
Expand Down
8 changes: 6 additions & 2 deletions cmd/csi-node-driver-registrar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (

// Command line flags
var (
connectionTimeout = flag.Duration("connection-timeout", 1*time.Minute, "Timeout for waiting for CSI driver socket.")
connectionTimeout = flag.Duration("connection-timeout", 0, "The --connection-timeout flag is deprecated")
csiAddress = flag.String("csi-address", "/run/csi/socket", "Path of the CSI driver socket that the node-driver-registrar will connect to.")
kubeletRegistrationPath = flag.String("kubelet-registration-path", "", "Path of the CSI driver socket on the Kubernetes host machine.")
showVersion = flag.Bool("version", false, "Show version.")
Expand Down Expand Up @@ -108,13 +108,17 @@ func main() {
}
klog.Infof("Version: %s", version)

if *connectionTimeout != 0 {
klog.Warning("--connection-timeout is deprecated and will have no effect")
}

// Once https://github.com/container-storage-interface/spec/issues/159 is
// resolved, if plugin does not support PUBLISH_UNPUBLISH_VOLUME, then we
// can skip adding mapping to "csi.volume.kubernetes.io/nodeid" annotation.

// Connect to CSI.
klog.V(1).Infof("Attempting to open a gRPC connection with: %q", *csiAddress)
csiConn, err := connection.NewConnection(*csiAddress, *connectionTimeout)
csiConn, err := connection.NewConnection(*csiAddress)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
Expand Down
41 changes: 3 additions & 38 deletions pkg/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ package connection
import (
"context"
"fmt"
"net"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"
"k8s.io/klog"
)
Expand Down Expand Up @@ -56,8 +53,8 @@ var (

//NewConnection return a grpc connection object to a remote CSI driver.
func NewConnection(
address string, timeout time.Duration) (CSIConnection, error) {
conn, err := connect(address, timeout)
address string) (CSIConnection, error) {
conn, err := connection.Connect(address)
if err != nil {
return nil, err
}
Expand All @@ -66,38 +63,6 @@ func NewConnection(
}, nil
}

func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
klog.V(2).Infof("Connecting to %s", address)
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithUnaryInterceptor(logGRPC),
}
if strings.HasPrefix(address, "/") {
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
}
conn, err := grpc.Dial(address, dialOptions...)

if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
klog.V(4).Infof("Connection timed out")
return conn, nil // return nil, subsequent GetPluginInfo will show the real connection error
}
if conn.GetState() == connectivity.Ready {
klog.V(3).Infof("Connected")
return conn, nil
}
klog.V(4).Infof("Still trying, connection is %s", conn.GetState())
}
}

func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) {
client := csi.NewIdentityClient(c.conn)

Expand Down
2 changes: 1 addition & 1 deletion pkg/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func createMockServer(t *testing.T) (

// Create a client connection to it
addr := drv.Address()
csiConn, err := NewConnection(addr, 10)
csiConn, err := NewConnection(addr)
if err != nil {
return nil, nil, nil, nil, nil, nil, err
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cc08995

Please sign in to comment.