Skip to content

Commit

Permalink
Support rund csi3.0 protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
mowangdk committed Oct 14, 2024
1 parent f6a0d2e commit 7ba78d8
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 107 deletions.
4 changes: 4 additions & 0 deletions pkg/cloud/metadata/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ var (
"alibabacloud.com/ecs-instance-id",
"sigma.ali/ecs-instance-id",
}
VmocLabels = []string {
"rm.alibaba-inc.com/vbm",
}
)

var MetadataLabels = map[MetadataKey][]string{
RegionID: RegionIDLabels,
ZoneID: ZoneIDLabels,
InstanceType: InstanceTypeLabels,
InstanceID: InstanceIdLabels,
VmocType: VmocLabels,
}

func init() {
Expand Down
15 changes: 9 additions & 6 deletions pkg/cloud/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
type MetadataKey int

const (
RegionID MetadataKey = iota
ZoneID MetadataKey = iota
InstanceID MetadataKey = iota
InstanceType MetadataKey = iota
AccountID MetadataKey = iota
ClusterID MetadataKey = iota
RegionID MetadataKey = iota
ZoneID
InstanceID
InstanceType
AccountID
ClusterID
VmocType
)

func (k MetadataKey) String() string {
Expand All @@ -38,6 +39,8 @@ func (k MetadataKey) String() string {
return "AccountID"
case ClusterID:
return "ClusterID"
case VmocType:
return "VmocType"
default:
return fmt.Sprintf("MetadataKey(%d)", k)
}
Expand Down
58 changes: 52 additions & 6 deletions pkg/disk/bdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ func IsNoSuchDeviceErr(err error) bool {
return strings.Contains(strings.ToLower(err.Error()), "no such device")
}

func IsNoSuchFileErr(err error) bool {
if err == nil {
return false
}
return strings.Contains(strings.ToLower(err.Error()), "no such file or directory")
}

// IohubSriovBind io hub bind
func IohubSriovBind(bdf string) error {
return ioutil.WriteFile(iohubSriovAction+"bind", []byte(bdf), 0600)
Expand Down Expand Up @@ -491,6 +498,10 @@ const (
DFBus // 1
)

const (
dfBusDevicePathPattern = "/sys/bus/dragonfly/devices/dfvirtio*/type"
)

func (_type MachineType) BusName() string {
busNames := [...]string{
BDFTypeBus,
Expand Down Expand Up @@ -524,20 +535,55 @@ type Driver interface {
GetDeviceNumber() string
}

func NewDeviceDriver(blockDevice, deviceNumber string, _type MachineType, extras map[string]string) (Driver, error) {
func NewDeviceDriver(volumeId, blockDevice, deviceNumber string, _type MachineType, extras map[string]string) (Driver, error) {
d := &driver{
blockDevice: blockDevice,
deviceNumber: deviceNumber,
machineType: _type,
extras: extras,
}
if d.deviceNumber == "" {
deviceNumber, err := DefaultDeviceManager.GetDeviceNumberFromBlockDevice(blockDevice, d.machineType.BusPrefix())
if blockDevice != "" {
if deviceNumber == "" {
deviceNumber, err := DefaultDeviceManager.GetDeviceNumberFromBlockDevice(blockDevice, d.machineType.BusPrefix())
if err != nil {
klog.Errorf("NewDeviceDriver: get device number from block device err: %v", err)
return nil, err
}
d.deviceNumber = deviceNumber
}
}
if deviceNumber != "" {
return d, nil
}
if _type == DFBus {
matchesFile, err := filepath.Glob(dfBusDevicePathPattern)
if err != nil {
return nil, fmt.Errorf("Failed to list DFbus type files path. err: %v", err)
}
for _, path := range matchesFile {
body, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("Dfbus read type file %q failed: %v", path, err)
}
infos := strings.Split(string(body), " ")
if len(infos) != 2 {
return nil, fmt.Errorf("Dfbus type file format error")
}
if infos[0] != "block" {
continue
}
if infos[1] == strings.TrimPrefix(volumeId, "d-") {
DFNumber := filepath.Base(filepath.Dir(path))
d.deviceNumber = DFNumber
return d, nil
}
}
} else {
output, err := utils.CommandOnNode("xdragon-bdf", "--nvme", "-id=%s", volumeId).CombinedOutput()
if err != nil {
klog.Errorf("NewDeviceDriver: get device number from block device err: %v", err)
return nil, err
return nil, fmt.Errorf("Failed to excute bdf command: %s, err: %v", volumeId, err)
}
d.deviceNumber = deviceNumber
d.deviceNumber = string(output)
}
return d, nil
}
Expand Down
164 changes: 69 additions & 95 deletions pkg/disk/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
utilsio "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/io"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/rund/directvolume"
Expand Down Expand Up @@ -186,10 +187,15 @@ func NewNodeServer(m metadata.MetadataProvider) csi.NodeServer {
}

kataBMIOType := BDF
if bmType := os.Getenv("KATA_BM_IO_TYPE"); bmType == DFBus.BusName() {
value, err := m.Get(metadata.VmocType)
if err != nil {
klog.Errorf("get vmoc failed: %+v", err)
}
if err == nil && value == "true" {
kataBMIOType = DFBus
}

klog.Infof("KATA BFIO Type: %v", kataBMIOType)
podCgroup, err := utils.NewPodCGroup()
if err != nil {
klog.Fatalf("Failed to initialize pod cgroup: %v", err)
Expand Down Expand Up @@ -302,7 +308,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
klog.Infof("NodePublishVolume: TargetPath: %s is umounted, start mount in kata mode", req.TargetPath)
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
returned, err := ns.mountRunDVolumes(req.VolumeId, sourcePath, req.TargetPath, fsType, mkfsOptions, isBlock, mountFlags)
pvName := utils.GetPvNameFormPodMnt(targetPath)
returned, err := ns.mountRunDVolumes(req.VolumeId, pvName, sourcePath, req.TargetPath, fsType, mkfsOptions, isBlock, mountFlags)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1194,13 +1201,21 @@ func (ns *nodeServer) umountRunDVolumes(volumePath string) (bool, error) {
return false, status.Error(codes.Internal, "vmoc(DFBus) mode only support csi-runD protocol 3.0")
}

d, _ := NewDeviceDriver("", mountInfo.Source, DFBus, nil)
d, _ := NewDeviceDriver("", "", mountInfo.Source, DFBus, nil)
cDriver, err := d.CurentDriver()
if err != nil {
if IsNoSuchFileErr(err) {
klog.Infof("vmoc(DFBus) dfbus driver has been removed: %s", DFBus)
return true, nil
}
return true, status.Error(codes.Internal, err.Error())
}
klog.Infof("vmoc(DFBus) current dfbus driver : %v", DFBus)
if cDriver == DFBusTypeVIRTIO {
return true, nil
}
if cDriver != DFBusTypeVFIO {
return true, status.Error(codes.Internal, "vmoc(DFBus) mode only support vfio")
return true, status.Error(codes.Internal, "vmoc(DFBus) mode only support vfio, virtio driver")
}
err = d.UnbindDriver()
if err != nil {
Expand All @@ -1210,7 +1225,6 @@ func (ns *nodeServer) umountRunDVolumes(volumePath string) (bool, error) {
if err != nil {
return true, status.Errorf(codes.Internal, "vmoc(DFBus) bind err: %s", err.Error())
}
return true, nil
}

klog.Infof("NodeUnPublishVolume:: start delete mount info for KataVolume: %s", volumePath)
Expand Down Expand Up @@ -1284,125 +1298,86 @@ func (ns *nodeServer) mountRunvVolumes(volumeId, sourcePath, targetPath, fsType,
return nil
}

func (ns *nodeServer) mountRunDVolumes(volumeId, sourcePath, targetPath, fsType, mkfsOptions string, isRawBlock bool, mountFlags []string) (bool, error) {
func (ns *nodeServer) mountRunDVolumes(volumeId, pvName, sourcePath, targetPath, fsType, mkfsOptions string, isRawBlock bool, mountFlags []string) (bool, error) {
klog.Infof("NodePublishVolume:: Disk Volume %s Mounted in RunD csi 3.0/2.0 protocol", volumeId)
deviceName, err := DefaultDeviceManager.GetDeviceByVolumeID(volumeId)
if err != nil {
deviceName = getVolumeConfig(volumeId)
}
if deviceName == "" {
klog.Errorf("NodePublishVolume(rund): cannot get local deviceName for volume: %s", volumeId)
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot get local deviceName for volume: "+volumeId)
}
deviceNumber := ""
volumePath := filepath.Dir(targetPath)

// Block runs csi3.0 protocol
if isRawBlock {
if features.FunctionalMutableFeatureGate.Enabled(features.RundCSIProtocol3) {
// umount the stage path, which is mounted in Stage
if err := ns.unmountStageTarget(sourcePath); err != nil {
klog.Errorf("NodePublishVolume(rund3.0): unmountStageTarget %s with error: %s", sourcePath, err.Error())
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: unmountStageTarget "+sourcePath+" with error: "+err.Error())
}
klog.Infof("NodePublishVolume(rund3.0): get bdf number by device: %s", deviceName)
deviceNumber := ""

driver, err := NewDeviceDriver(volumeId, deviceName, deviceNumber, ns.kataBMIOType, map[string]string{})
if err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't get bdf number of volume: %s: err: %v", volumeId, err)
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot get bdf number of volume: "+volumeId)
}
currentDriver, err := driver.CurentDriver()
if err != nil {
return true, status.Errorf(codes.Internal, "NodePublishVolume(rund3.0): can't get current volume driver: %+v", err)
}
deviceType := directvolume.DeviceTypePCI
if ns.kataBMIOType == DFBus {
deviceType = directvolume.DeviceTypeDFBusPort
driver, err := NewDeviceDriver(deviceName, "", DFBus, map[string]string{})
}
extras := make(map[string]string)
// for volume resize socket generation
extras["PVName"] = pvName
extras["DiskId"] = volumeId
if isRawBlock {
klog.V(2).Infof("NodePublishVolume(rund3.0): get bdf number by device: %s", deviceName)
deviceUid := 0
deviceGid := 0
deviceInfo, err := os.Stat(deviceName)
if err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't get dfbusport number of volume: %s: err: %v", volumeId, err)
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot get bdf number of volume: "+volumeId)
}
deviceNumber = driver.GetDeviceNumber()
// we can find deviceName means that device is bind to virtio driver
if err := driver.UnbindDriver(); err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't unbind current device driver: %s", deviceNumber)
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot unbind current device driver: "+deviceNumber)
}
if err = driver.BindDriver(DFBusTypeVFIO); err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't bind vfio driver to device: %s", deviceNumber)
klog.Errorf("NodePublishVolume(rund3.0): can't get device info of volume: %s: err: %v", volumeId, err)
}
} else {
driver, err := NewDeviceDriver(deviceName, "", BDF, map[string]string{})
if err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't get bdf number of volume: %s: err: %v", volumeId, err)
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot get bdf number of volume: "+volumeId)
if stat, ok := deviceInfo.Sys().(*syscall.Stat_t); ok {
deviceUid = int(stat.Uid)
deviceGid = int(stat.Gid)
}
deviceNumber = driver.GetDeviceNumber()
}
deviceUid := 0
deviceGid := 0
deviceInfo, err := os.Stat(deviceName)
if err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't get device info of volume: %s: err: %v", volumeId, err)
extras["Type"] = "b"
extras["FileMode"] = directvolume.BlockFileModeReadWrite
extras["Uid"] = strconv.Itoa(deviceUid)
extras["Gid"] = strconv.Itoa(deviceGid)
}
if stat, ok := deviceInfo.Sys().(*syscall.Stat_t); ok {
deviceUid = int(stat.Uid)
deviceGid = int(stat.Gid)
}
extras := make(map[string]string)
extras["Type"] = "b"
extras["FileMode"] = directvolume.BlockFileModeReadWrite
extras["Uid"] = strconv.Itoa(deviceUid)
extras["Gid"] = strconv.Itoa(deviceGid)

mountOptions := []string{}
if mountFlags != nil {
mountOptions = mountFlags
}

mountInfo := directvolume.MountInfo{
Source: deviceNumber,
Source: driver.GetDeviceNumber(),
DeviceType: deviceType,
MountOpts: mountOptions,
Extra: extras,
FSType: fsType,
}

klog.Info("NodePublishVolume(rund3.0): Starting add mount info to DirectVolume")
err = directvolume.AddMountInfo(volumePath, mountInfo)
if err != nil {
klog.Errorf("NodePublishVolume(rund3.0): Adding mount infomation to DirectVolume failed: %v", err)
return true, err
}
klog.Info("NodePublishVolume(rund3.0): Adding mount information to DirectVolume succeeds, return immediately")
return true, nil
}

if ns.kataBMIOType == DFBus {
// umount the stage path, which is mounted in Stage
if err := ns.unmountStageTarget(sourcePath); err != nil {
klog.Errorf("NodePublishVolume(rund3.0): unmountStageTarget in vmoc(DFBus) mode %s with error: %s", sourcePath, err.Error())
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: unmountStageTarget in vmoc(DFBus) "+sourcePath+" with error: "+err.Error())
}
klog.Infof("NodePublishVolume(rund3.0): get dfbusport number by device: %s", deviceName)
driver, err := NewDeviceDriver(deviceName, "", DFBus, map[string]string{})
if err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't get dfbusport number of volume: %s: err: %v", volumeId, err)
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot get bdf number of volume: "+volumeId)
}
// we can find deviceName means that device is bind to virtio driver
if err := driver.UnbindDriver(); err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't unbind current device driver: %s", driver.GetDeviceNumber())
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot unbind current device driver: "+driver.GetDeviceNumber())
}
if err = driver.BindDriver(DFBusTypeVFIO); err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't bind vfio driver to device: %s", driver.GetDeviceNumber())
}
if err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't get dfbusport number of volume: %s: err: %v", volumeId, err)
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot get dfbusport number of volume: "+volumeId)
}
mountOptions := []string{}
if mountFlags != nil {
mountOptions = mountFlags
}

mountInfo := directvolume.MountInfo{
Source: driver.GetDeviceNumber(),
DeviceType: directvolume.DeviceTypeDFBusPort,
MountOpts: mountOptions,
Extra: map[string]string{},
FSType: fsType,
if currentDriver == DFBusTypeVIRTIO || currentDriver == PCITypeVIRTIO {
if err := driver.UnbindDriver(); err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't unbind current device driver: %s", deviceNumber)
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot unbind current device driver: "+deviceNumber)
}
if ns.kataBMIOType == DFBus {
if err = driver.BindDriver(DFBusTypeVFIO); err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't bind vfio driver to device: %s", deviceNumber)
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot bind current device driver: "+deviceNumber)
}
} else {
if err = driver.BindDriver(PCITypeVFIO); err != nil {
klog.Errorf("NodePublishVolume(rund3.0): can't bind vfio driver to device: %s", deviceNumber)
return true, status.Error(codes.InvalidArgument, "NodePublishVolume: cannot bind current device driver: "+deviceNumber)
}
}
}

klog.Info("NodePublishVolume(rund3.0): Starting add vmoc(DFBus) mount info to DirectVolume")
Expand All @@ -1414,7 +1389,6 @@ func (ns *nodeServer) mountRunDVolumes(volumeId, sourcePath, targetPath, fsType,
klog.Info("NodePublishVolume(rund3.0): Adding vmoc(DFBus) mount information to DirectVolume succeeds, return immediately")
return true, nil
}

// (runD2.0) Need write mountOptions(metadata) parameters to file, and run normal runc process
klog.Infof("NodePublishVolume(rund): run csi runD protocol 2.0 logic")
volumeData := map[string]string{}
Expand All @@ -1431,7 +1405,7 @@ func (ns *nodeServer) mountRunDVolumes(volumeId, sourcePath, targetPath, fsType,
fileName = filepath.Join(filepath.Dir(filepath.Dir(targetPath)), directvolume.RunD2MountInfoFileName)
}
if err = utils.AppendJSONData(fileName, volumeData); err != nil {
klog.Warningf("NodeStageVolume: append volume spec to %s with error: %s", fileName, err.Error())
klog.Warning("NodeStageVolume: append volume spec to %s with error: %s", fileName, err.Error())
}
return false, nil

Expand Down

0 comments on commit 7ba78d8

Please sign in to comment.