diff --git a/go.mod b/go.mod index 2acb11cc67..f6f7162684 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/onsi/ginkgo v1.10.2 github.com/onsi/gomega v1.7.0 github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect + golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f google.golang.org/grpc v1.26.0 k8s.io/api v0.17.3 k8s.io/apimachinery v0.17.3 diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 2dc3dc28f2..e67004b4be 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -22,15 +22,18 @@ import ( "os" "path/filepath" "regexp" + "strconv" "strings" csi "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/internal" + "golang.org/x/sys/unix" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" "k8s.io/kubernetes/pkg/util/resizefs" + "k8s.io/kubernetes/pkg/volume" "k8s.io/utils/exec" "k8s.io/utils/mount" ) @@ -65,6 +68,7 @@ var ( nodeCaps = []csi.NodeServiceCapability_RPC_Type{ csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, csi.NodeServiceCapability_RPC_EXPAND_VOLUME, + csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, } ) @@ -344,7 +348,66 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu } func (d *nodeService) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { - return nil, status.Error(codes.Unimplemented, "NodeGetVolumeStats is not implemented yet") + klog.V(4).Infof("NodeGetVolumeStats: called with args %+v", *req) + + if len(req.VolumeId) == 0 { + return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty") + } + if len(req.VolumePath) == 0 { + return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty") + } + + exists, err := d.mounter.ExistsPath(req.VolumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "unknown error when stat on %s: %v", req.VolumePath, err) + } + if !exists { + return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath) + } + + isBlock, err := d.isBlockDevice(req.VolumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to determine whether %s is block device: %v", req.VolumePath, err) + } + if isBlock { + var bcap int64 + bcap, err = d.getBlockSizeBytes(req.VolumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get block capacity on path %s: %v", req.VolumePath, err) + } + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Unit: csi.VolumeUsage_BYTES, + Total: bcap, + }, + }, + }, nil + } + + metricsProvider := volume.NewMetricsStatFS(req.VolumePath) + + metrics, err := metricsProvider.GetMetrics() + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get fs info on path %s: %v", req.VolumePath, err) + } + + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Unit: csi.VolumeUsage_BYTES, + Available: metrics.Available.AsDec().UnscaledBig().Int64(), + Total: metrics.Capacity.AsDec().UnscaledBig().Int64(), + Used: metrics.Used.AsDec().UnscaledBig().Int64(), + }, + { + Unit: csi.VolumeUsage_INODES, + Available: metrics.InodesFree.AsDec().UnscaledBig().Int64(), + Total: metrics.Inodes.AsDec().UnscaledBig().Int64(), + Used: metrics.InodesUsed.AsDec().UnscaledBig().Int64(), + }, + }, + }, nil } func (d *nodeService) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { @@ -550,3 +613,27 @@ func hasMountOption(options []string, opt string) bool { } return false } + +func (d *nodeService) isBlockDevice(fullPath string) (bool, error) { + var st unix.Stat_t + err := unix.Stat(fullPath, &st) + if err != nil { + return false, err + } + + return (st.Mode & unix.S_IFMT) == unix.S_IFBLK, nil +} + +func (d *nodeService) getBlockSizeBytes(devicePath string) (int64, error) { + cmd := d.mounter.Command("blockdev", "--getsize64", devicePath) + output, err := cmd.Output() + if err != nil { + return -1, fmt.Errorf("error when getting size of block volume at path %s: output: %s, err: %v", devicePath, string(output), err) + } + strOut := strings.TrimSpace(string(output)) + gotSizeBytes, err := strconv.ParseInt(strOut, 10, 64) + if err != nil { + return -1, fmt.Errorf("failed to parse size %s into int a size", strOut) + } + return gotSizeBytes, nil +} diff --git a/pkg/driver/node_test.go b/pkg/driver/node_test.go index 7430d5fd1a..4fe13f5ffc 100644 --- a/pkg/driver/node_test.go +++ b/pkg/driver/node_test.go @@ -1175,6 +1175,9 @@ func TestNodeGetVolumeStats(t *testing.T) { mockMetadata := mocks.NewMockMetadataService(mockCtl) mockMounter := mocks.NewMockMounter(mockCtl) + VolumePath := "/test" + + mockMounter.EXPECT().ExistsPath(VolumePath).Return(true, nil) awsDriver := nodeService{ metadata: mockMetadata, @@ -1182,19 +1185,13 @@ func TestNodeGetVolumeStats(t *testing.T) { inFlight: internal.NewInFlight(), } - expErrCode := codes.Unimplemented - - req := &csi.NodeGetVolumeStatsRequest{} - _, err := awsDriver.NodeGetVolumeStats(context.TODO(), req) - if err == nil { - t.Fatalf("Expected error code %d, got nil", expErrCode) + req := &csi.NodeGetVolumeStatsRequest{ + VolumeId: "vol-test", + VolumePath: VolumePath, } - srvErr, ok := status.FromError(err) - if !ok { - t.Fatalf("Could not get error status code from error: %v", srvErr) - } - if srvErr.Code() != expErrCode { - t.Fatalf("Expected error code %d, got %d message %s", expErrCode, srvErr.Code(), srvErr.Message()) + _, err := awsDriver.NodeGetVolumeStats(context.TODO(), req) + if err != nil { + t.Fatalf("Expected no error, got %v", err) } }