diff --git a/pkg/local/client/grpc.go b/pkg/local/client/grpc.go index 06b199772..9113c44a5 100644 --- a/pkg/local/client/grpc.go +++ b/pkg/local/client/grpc.go @@ -20,16 +20,16 @@ import ( "context" "encoding/json" "fmt" + "net" + "strings" + "time" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/local/lib" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/local/manager" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" log "github.com/sirupsen/logrus" "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" - "net" - "strings" - "time" ) // Connection lvm connection interface @@ -103,30 +103,17 @@ func connect(address string, timeout time.Duration, creds credentials.TransportC grpc.WithTransportCredentials(creds), grpc.WithBackoffMaxDelay(time.Second), grpc.WithUnaryInterceptor(logGRPC), + grpc.WithBlock(), } if strings.HasPrefix(address, "/") { dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout("unix", addr, timeout) })) } - conn, err := grpc.Dial(address, dialOptions...) - - if err != nil { - return nil, err - } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - for { - if !conn.WaitForStateChange(ctx, conn.GetState()) { - log.Warnf("Connection to %s timed out", address) - return conn, nil // return nil, subsequent GetPluginInfo will show the real connection error - } - if conn.GetState() == connectivity.Ready { - log.Warnf("Connected to %s", address) - return conn, nil - } - log.Infof("Still trying to connect %s, connection is %s", address, conn.GetState()) - } + + return grpc.DialContext(ctx, address, dialOptions...) } func (c *workerConnection) CreateLoopDevice(ctx context.Context, pvName, quotaSize string) (string, error) { diff --git a/pkg/local/controllerserver.go b/pkg/local/controllerserver.go index 14c8aa7f4..d9cb5028f 100644 --- a/pkg/local/controllerserver.go +++ b/pkg/local/controllerserver.go @@ -37,12 +37,15 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" ) type controllerServer struct { *csicommon.DefaultControllerServer client kubernetes.Interface + recorder record.EventRecorder caCertFile string clientCertFile string clientKeyFile string @@ -100,6 +103,7 @@ func newControllerServer(d *csicommon.CSIDriver, caCertFile string, clientCertFi return &controllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), client: types.GlobalConfigVar.KubeClient, + recorder: utils.NewEventRecorder(), caCertFile: caCertFile, clientCertFile: clientCertFile, clientKeyFile: clientKeyFile, @@ -444,13 +448,36 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol func (cs *controllerServer) getNodeConn(nodeSelected string, caCertFile string, clientCertFile string, clientKeyFile string) (client.Connection, error) { addr, err := utils.GetNodeAddr(cs.client, nodeSelected, server.GetLvmdPort()) if err != nil { - log.Errorf("CreateVolume: Get node %s address with error: %s", nodeSelected, err.Error()) + log.Errorf("Get node %s address with error: %s", nodeSelected, err.Error()) return nil, err } conn, err := client.NewGrpcConnection(addr, connectTimeout, caCertFile, clientCertFile, clientKeyFile) + if errors.Is(err, context.DeadlineExceeded) { + // Node IP may be changed, try to get new IP + utils.ClearNodeIPCache(nodeSelected) + addr2, err := utils.GetNodeAddr(cs.client, nodeSelected, server.GetLvmdPort()) + if err != nil { + log.Errorf("Get node %s address without cache failed: %s", nodeSelected, err.Error()) + return nil, err + } + if addr2 != addr { + log.Infof("Node %s address changed from %s to %s, re-connecting", nodeSelected, addr, addr2) + return client.NewGrpcConnection(addr2, connectTimeout, caCertFile, clientCertFile, clientKeyFile) + } else { + log.Errorf("Node %s address %s verified from Kubernetes, but failed to connect", nodeSelected, addr) + return nil, err + } + } return conn, err } +// Send an event then return success +func (cs *controllerServer) handleNodeDeleted(pvObj *v1.PersistentVolume, volumeDesc, nodeName string) (*csi.DeleteVolumeResponse, error) { + log.Infof("DeleteVolume: Node %s deleted, skipping delete volume %s (%s)", nodeName, pvObj.Name, volumeDesc) + cs.recorder.Eventf(pvObj, v1.EventTypeWarning, "VolumeOnDeletedNode", "Volume data may still present in %s at deleted node %s", volumeDesc, nodeName) + return &csi.DeleteVolumeResponse{}, nil +} + // DeleteVolume csi interface func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { log.Infof("DeleteVolume: deleting local volume: %s", req.GetVolumeId()) @@ -474,6 +501,9 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol if types.GlobalConfigVar.GrpcProvision && nodeName != "" { conn, err := cs.getNodeConn(nodeName, cs.caCertFile, cs.clientCertFile, cs.clientKeyFile) if err != nil { + if apierrors.IsNotFound(err) { + return cs.handleNodeDeleted(pvObj, fmt.Sprintf("LVM logical volume %s/%s", vgName, volumeID), nodeName) + } log.Errorf("DeleteVolume: New lvm %s Connection at node %s with error: %s", req.GetVolumeId(), nodeName, err.Error()) return nil, err } @@ -536,20 +566,20 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, errors.New("MountPoint Pv is illegal, No node info") } nodeName := nodes[0] + path := pvObj.Spec.CSI.VolumeAttributes[MountPointType] + if path == "" { + log.Errorf("DeleteVolume: Get MountPoint Path for volume %s, with empty", volumeID) + return nil, errors.New("MountPoint Path is empty") + } conn, err := cs.getNodeConn(nodeName, cs.caCertFile, cs.clientCertFile, cs.clientKeyFile) if err != nil { + if apierrors.IsNotFound(err) { + return cs.handleNodeDeleted(pvObj, fmt.Sprintf("mount point %s", path), nodeName) + } log.Errorf("DeleteVolume: New mountpoint %s Connection error: %s", req.GetVolumeId(), err.Error()) return nil, err } defer conn.Close() - path := "" - if value, ok := pvObj.Spec.CSI.VolumeAttributes[MountPointType]; ok { - path = value - } - if path == "" { - log.Errorf("DeleteVolume: Get MountPoint Path for volume %s, with empty", volumeID) - return nil, errors.New("MountPoint Path is empty") - } if err := conn.CleanPath(ctx, path); err != nil { log.Errorf("DeleteVolume: Remove mountpoint for %s with error: %s", req.GetVolumeId(), err.Error()) return nil, errors.New("DeleteVolume: Delete mountpoint Failed: " + err.Error()) @@ -560,17 +590,20 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol log.Infof("DeleteVolume: successful delete Device volume(%s)...", volumeID) case PmemVolumeType: if nodeName != "" { + namespace, ok := pvObj.Spec.CSI.VolumeAttributes["pmemNameSpace"] + if !ok { + log.Errorf("DeleteVolume: Direct PMEM volume can not found NameSpace: %s", volumeID) + return nil, errors.New("DeleteVolume Direct PMEM volume can not found NameSpace " + volumeID) + } conn, err := cs.getNodeConn(nodeName, cs.caCertFile, cs.clientCertFile, cs.clientKeyFile) if err != nil { + if apierrors.IsNotFound(err) { + return cs.handleNodeDeleted(pvObj, fmt.Sprintf("PMEM namespace %s", namespace), nodeName) + } log.Errorf("DeleteVolume: New PMEM %s Connection at node %s with error: %s", req.GetVolumeId(), nodeName, err.Error()) return nil, err } defer conn.Close() - if _, ok := pvObj.Spec.CSI.VolumeAttributes["pmemNameSpace"]; !ok { - log.Errorf("DeleteVolume: Direct PMEM volume can not found NameSpace: %s", volumeID) - return nil, errors.New("DeleteVolume Direct PMEM volume can not found NameSpace " + volumeID) - } - namespace := pvObj.Spec.CSI.VolumeAttributes["pmemNameSpace"] if pmemName, err := conn.GetNameSpace(ctx, "", volumeID); err == nil && pmemName != "" { if err := conn.DeleteNameSpace(ctx, namespace); err != nil { log.Errorf("DeleteVolume: Remove PMEM direct volume %s at node %s with error: %s", volumeID, nodeName, err.Error()) @@ -588,18 +621,21 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } case QuotaPathVolumeType: if nodeName != "" { + quotaPath, ok := pvObj.Spec.CSI.VolumeAttributes[ProjQuotaFullPath] + if !ok { + log.Errorf("DeleteVolume: QuotaPath volume %s not have projQuotaFullPath parameter", req.VolumeId) + return nil, fmt.Errorf("DeleteVolume: QuotaPath volume %s not have projQuotaFullPath parameter", req.VolumeId) + } conn, err := cs.getNodeConn(nodeName, cs.caCertFile, cs.clientCertFile, cs.clientKeyFile) if err != nil { + if apierrors.IsNotFound(err) { + return cs.handleNodeDeleted(pvObj, fmt.Sprintf("path %s", quotaPath), nodeName) + } log.Errorf("DeleteVolume: get QuotaPath volume %s Connection at node %s with error: %s", req.VolumeId, nodeName, err.Error()) return nil, err } defer conn.Close() - if _, ok := pvObj.Spec.CSI.VolumeAttributes[ProjQuotaFullPath]; !ok { - log.Errorf("DeleteVolume: QuotaPath volume %s not have projQuotaFullPath parameter", req.VolumeId) - return nil, fmt.Errorf("DeleteVolume: QuotaPath volume %s not have projQuotaFullPath parameter", req.VolumeId) - } - quotaPath := pvObj.Spec.CSI.VolumeAttributes[ProjQuotaFullPath] _, err = conn.RemoveProjQuotaSubpath(ctx, quotaPath) if err != nil { log.Errorf("DeleteVolume: Remove QuotaPath volume %s at node %s with error %s", req.VolumeId, nodeName, err.Error()) @@ -611,18 +647,21 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } case LoopDeviceVolumeType: if nodeName != "" { + loopdevicePath, ok := pvObj.Spec.CSI.VolumeAttributes[LoopDeviceFullPath] + if !ok { + log.Errorf("DeleteVolume: LoopDevice volume %s not have loopDeviceFullPath parameter", req.VolumeId) + return nil, fmt.Errorf("DeleteVolume: loopdevice volume %s not have loopDeviceFullPath parameter", req.VolumeId) + } conn, err := cs.getNodeConn(nodeName, cs.caCertFile, cs.clientCertFile, cs.clientKeyFile) if err != nil { + if apierrors.IsNotFound(err) { + return cs.handleNodeDeleted(pvObj, fmt.Sprintf("loop device path %s", loopdevicePath), nodeName) + } log.Errorf("DeleteVolume: get loopdevice volume %s Connection at node %s with error: %s", req.VolumeId, nodeName, err.Error()) return nil, err } defer conn.Close() - if _, ok := pvObj.Spec.CSI.VolumeAttributes[LoopDeviceFullPath]; !ok { - log.Errorf("DeleteVolume: LoopDevice volume %s not have loopDeviceFullPath parameter", req.VolumeId) - return nil, fmt.Errorf("DeleteVolume: loopdevice volume %s not have loopDeviceFullPath parameter", req.VolumeId) - } - loopdevicePath := pvObj.Spec.CSI.VolumeAttributes[LoopDeviceFullPath] log.Errorf("DeleteVolume: LoopDevice volume associated with devices: %s", loopdevicePath) _, err = conn.DeleteLoopDevice(ctx, pvObj.Name) if err != nil { diff --git a/pkg/utils/util.go b/pkg/utils/util.go index 66a78a6a6..e7b800a1d 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -115,10 +115,7 @@ const ( var KubernetesAlicloudIdentity = "Kubernetes.Alicloud/CsiPlugin" var ( - // NodeAddrMap map for NodeID and its Address - NodeAddrMap = map[string]string{} - // NodeAddrMutex Mutex for NodeAddr map - NodeAddrMutex sync.RWMutex + nodeAddrMap = sync.Map{} ) // RoleAuth define STS Token Response @@ -739,11 +736,8 @@ func Fsync(f *os.File) error { return f.Sync() } -// SetNodeAddrMap set map with mutex -func SetNodeAddrMap(key string, value string) { - NodeAddrMutex.Lock() - NodeAddrMap[key] = value - NodeAddrMutex.Unlock() +func ClearNodeIPCache(nodeID string) { + nodeAddrMap.Delete(nodeID) } // GetNodeAddr get node address @@ -752,13 +746,13 @@ func GetNodeAddr(client kubernetes.Interface, node string, port string) (string, if err != nil { return "", err } - return ip.String() + ":" + port, nil + return net.JoinHostPort(ip.String(), port), nil } // GetNodeIP get node address func GetNodeIP(client kubernetes.Interface, nodeID string) (net.IP, error) { - if value, ok := NodeAddrMap[nodeID]; ok && value != "" { - return net.ParseIP(value), nil + if value, ok := nodeAddrMap.Load(nodeID); ok { + return value.(net.IP), nil } node, err := client.CoreV1().Nodes().Get(context.Background(), nodeID, metav1.GetOptions{}) if err != nil { @@ -769,13 +763,12 @@ func GetNodeIP(client kubernetes.Interface, nodeID string) (net.IP, error) { for i := range addresses { addressMap[addresses[i].Type] = append(addressMap[addresses[i].Type], addresses[i]) } - if addresses, ok := addressMap[v1.NodeInternalIP]; ok { - SetNodeAddrMap(nodeID, addresses[0].Address) - return net.ParseIP(addresses[0].Address), nil - } - if addresses, ok := addressMap[v1.NodeExternalIP]; ok { - SetNodeAddrMap(nodeID, addresses[0].Address) - return net.ParseIP(addresses[0].Address), nil + for _, t := range []v1.NodeAddressType{v1.NodeInternalIP, v1.NodeExternalIP} { + if addresses, ok := addressMap[t]; ok { + ip := net.ParseIP(addresses[0].Address) + nodeAddrMap.Store(nodeID, ip) + return ip, nil + } } return nil, fmt.Errorf("Node IP unknown; known addresses: %v", addresses) }