From 179a992178e5718a36165a4105870d3dd9b26405 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 6 Jan 2021 13:55:18 -0800 Subject: [PATCH] add volume stats metrics update tests --- go.mod | 1 + pkg/driver/node.go | 78 ++++++++++++++++++++++++++++++++++++++- pkg/driver/node_test.go | 28 ++++++++------ pkg/driver/sanity_test.go | 55 +++++++++++++++++++++++++-- pkg/driver/statter.go | 71 +++++++++++++++++++++++++++++++++++ 5 files changed, 216 insertions(+), 17 deletions(-) create mode 100644 pkg/driver/statter.go diff --git a/go.mod b/go.mod index 2acb11cc67..f6f7162684 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/onsi/ginkgo v1.10.2 github.com/onsi/gomega v1.7.0 github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect + golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f google.golang.org/grpc v1.26.0 k8s.io/api v0.17.3 k8s.io/apimachinery v0.17.3 diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 2dc3dc28f2..bf7abd4962 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "regexp" + "strconv" "strings" csi "github.com/container-storage-interface/spec/lib/go/csi" @@ -65,6 +66,7 @@ var ( nodeCaps = []csi.NodeServiceCapability_RPC_Type{ csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, csi.NodeServiceCapability_RPC_EXPAND_VOLUME, + csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, } ) @@ -72,6 +74,7 @@ var ( type nodeService struct { metadata cloud.MetadataService mounter Mounter + statter Statter inFlight *internal.InFlight driverOptions *DriverOptions } @@ -87,6 +90,7 @@ func newNodeService(driverOptions *DriverOptions) nodeService { return nodeService{ metadata: metadata, mounter: newNodeMounter(), + statter: NewStatter(), inFlight: internal.NewInFlight(), driverOptions: driverOptions, } @@ -344,7 +348,65 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu } func (d *nodeService) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { - return nil, status.Error(codes.Unimplemented, "NodeGetVolumeStats is not implemented yet") + klog.V(4).Infof("NodeGetVolumeStats: called with args %+v", *req) + if len(req.VolumeId) == 0 { + return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty") + } + if len(req.VolumePath) == 0 { + return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty") + } + + exists, err := d.mounter.ExistsPath(req.VolumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "unknown error when stat on %s: %v", req.VolumePath, err) + } + if !exists { + return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath) + } + + isBlock, err := d.statter.IsBlockDevice(req.VolumePath) + + if err != nil { + return nil, status.Errorf(codes.NotFound, "failed to determine whether %s is block device: %v", req.VolumePath, err) + } + if isBlock { + bcap, err := d.getBlockSizeBytes(req.VolumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get block capacity on path %s: %v", req.VolumePath, err) + } + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Unit: csi.VolumeUsage_BYTES, + Total: bcap, + }, + }, + }, nil + } + + available, capacity, used, inodesFree, inodes, inodesUsed, err := d.statter.StatFS(req.VolumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get fs info on path %s: %v", req.VolumePath, err) + } + + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Available: available, + Total: capacity, + Used: used, + Unit: csi.VolumeUsage_BYTES, + }, + + { + Available: inodesFree, + Total: inodes, + Used: inodesUsed, + Unit: csi.VolumeUsage_INODES, + }, + }, + }, nil + } func (d *nodeService) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { @@ -550,3 +612,17 @@ func hasMountOption(options []string, opt string) bool { } return false } + +func (d *nodeService) getBlockSizeBytes(devicePath string) (int64, error) { + cmd := d.mounter.Command("blockdev", "--getsize64", devicePath) + output, err := cmd.Output() + if err != nil { + return -1, fmt.Errorf("error when getting size of block volume at path %s: output: %s, err: %v", devicePath, string(output), err) + } + strOut := strings.TrimSpace(string(output)) + gotSizeBytes, err := strconv.ParseInt(strOut, 10, 64) + if err != nil { + return -1, fmt.Errorf("failed to parse size %s as int", strOut) + } + return gotSizeBytes, nil +} diff --git a/pkg/driver/node_test.go b/pkg/driver/node_test.go index 7430d5fd1a..3d585e7e7f 100644 --- a/pkg/driver/node_test.go +++ b/pkg/driver/node_test.go @@ -1175,26 +1175,23 @@ func TestNodeGetVolumeStats(t *testing.T) { mockMetadata := mocks.NewMockMetadataService(mockCtl) mockMounter := mocks.NewMockMounter(mockCtl) + VolumePath := "/test" + mockMounter.EXPECT().ExistsPath(VolumePath).Return(true, nil) awsDriver := nodeService{ metadata: mockMetadata, mounter: mockMounter, + statter: NewFakeStatter(), inFlight: internal.NewInFlight(), } - expErrCode := codes.Unimplemented - - req := &csi.NodeGetVolumeStatsRequest{} - _, err := awsDriver.NodeGetVolumeStats(context.TODO(), req) - if err == nil { - t.Fatalf("Expected error code %d, got nil", expErrCode) - } - srvErr, ok := status.FromError(err) - if !ok { - t.Fatalf("Could not get error status code from error: %v", srvErr) + req := &csi.NodeGetVolumeStatsRequest{ + VolumeId: "vol-test", + VolumePath: VolumePath, } - if srvErr.Code() != expErrCode { - t.Fatalf("Expected error code %d, got %d message %s", expErrCode, srvErr.Code(), srvErr.Message()) + _, err := awsDriver.NodeGetVolumeStats(context.TODO(), req) + if err != nil { + t.Fatalf("Expected no error, got %v", err) } } @@ -1226,6 +1223,13 @@ func TestNodeGetCapabilities(t *testing.T) { }, }, }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, + }, + }, + }, } expResp := &csi.NodeGetCapabilitiesResponse{Capabilities: caps} diff --git a/pkg/driver/sanity_test.go b/pkg/driver/sanity_test.go index 8afb81dc63..f40af736a8 100644 --- a/pkg/driver/sanity_test.go +++ b/pkg/driver/sanity_test.go @@ -26,14 +26,16 @@ func TestSanity(t *testing.T) { } defer os.RemoveAll(dir) - targetPath := filepath.Join(dir, "target") + targetPath := filepath.Join(dir, "mount") stagingPath := filepath.Join(dir, "staging") endpoint := "unix://" + filepath.Join(dir, "csi.sock") config := &sanity.Config{ - TargetPath: targetPath, - StagingPath: stagingPath, - Address: endpoint, + TargetPath: targetPath, + StagingPath: stagingPath, + Address: endpoint, + CreateTargetDir: createDir, + CreateStagingDir: createDir, } driverOptions := &DriverOptions{ @@ -54,6 +56,7 @@ func TestSanity(t *testing.T) { AvailabilityZone: "az", }, mounter: newFakeMounter(), + statter: NewFakeStatter(), inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, }, @@ -73,6 +76,15 @@ func TestSanity(t *testing.T) { sanity.Test(t, config) } +func createDir(targetPath string) (string, error) { + if err := os.MkdirAll(targetPath, 0755); err != nil { + if !os.IsExist(err) { + return "", err + } + } + return targetPath, nil +} + type fakeCloudProvider struct { disks map[string]*fakeDisk // snapshots contains mapping from snapshot ID to snapshot @@ -309,13 +321,48 @@ func (f *fakeMounter) GetDeviceName(mountPath string) (string, int, error) { } func (f *fakeMounter) MakeFile(pathname string) error { + file, err := os.OpenFile(pathname, os.O_CREATE, os.FileMode(0644)) + if err != nil { + if !os.IsExist(err) { + return err + } + } + if err = file.Close(); err != nil { + return err + } return nil } func (f *fakeMounter) MakeDir(pathname string) error { + err := os.MkdirAll(pathname, os.FileMode(0755)) + if err != nil { + if !os.IsExist(err) { + return err + } + } return nil } func (f *fakeMounter) ExistsPath(filename string) (bool, error) { + if _, err := os.Stat(filename); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err + } return true, nil } + +type fakeStatter struct{} + +func NewFakeStatter() fakeStatter { + return fakeStatter{} +} + +func (fakeStatter) StatFS(path string) (available, capacity, used, inodesFree, inodes, inodesUsed int64, err error) { + // Assume the file exists and give some dummy values back + return 1, 1, 1, 1, 1, 1, nil +} + +func (fakeStatter) IsBlockDevice(fullPath string) (bool, error) { + return false, nil +} diff --git a/pkg/driver/statter.go b/pkg/driver/statter.go new file mode 100644 index 0000000000..aa66ea2dc1 --- /dev/null +++ b/pkg/driver/statter.go @@ -0,0 +1,71 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "fmt" + "golang.org/x/sys/unix" + "runtime" +) + +type Statter interface { + StatFS(path string) (int64, int64, int64, int64, int64, int64, error) + IsBlockDevice(string) (bool, error) +} + +var _ Statter = statter{} + +type statter struct { +} + +func NewStatter() statter { + return statter{} +} + +// IsBlock checks if the given path is a block device +func (statter) IsBlockDevice(fullPath string) (bool, error) { + if runtime.GOOS == "windows" { + return false, nil + } + var st unix.Stat_t + err := unix.Stat(fullPath, &st) + if err != nil { + return false, err + } + + return (st.Mode & unix.S_IFMT) == unix.S_IFBLK, nil +} + +func (statter) StatFS(path string) (available, capacity, used, inodesFree, inodes, inodesUsed int64, err error) { + if runtime.GOOS == "windows" { + return 0, 0, 0, 0, 0, 0, fmt.Errorf("Not implemented") + } + statfs := &unix.Statfs_t{} + err = unix.Statfs(path, statfs) + if err != nil { + err = fmt.Errorf("failed to get fs info on path %s: %v", path, err) + return + } + + // Available is blocks available * fragment size + available = int64(statfs.Bavail) * int64(statfs.Bsize) + // Capacity is total block count * fragment size + capacity = int64(statfs.Blocks) * int64(statfs.Bsize) + // Usage is block being used * fragment size (aka block size). + used = (int64(statfs.Blocks) - int64(statfs.Bfree)) * int64(statfs.Bsize) + inodes = int64(statfs.Files) + inodesFree = int64(statfs.Ffree) + inodesUsed = inodes - inodesFree + return +}