Skip to content

Commit

Permalink
Implement NodeGetVolumeStats to support node volume metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
AndyXiangLi committed Jan 9, 2021
1 parent 0edd545 commit f7ec047
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 25 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/onsi/ginkgo v1.10.2
github.com/onsi/gomega v1.7.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
google.golang.org/grpc v1.26.0
k8s.io/api v0.17.3
k8s.io/apimachinery v0.17.3
Expand Down
6 changes: 6 additions & 0 deletions pkg/driver/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (m *NodeMounter) GetDeviceName(mountPath string) (string, int, error) {
return mount.GetDeviceNameFromMount(m, mountPath)
}

// This function is mirrored in ./sanity_test.go to make sure sanity test covered this block of code
// Please mirror the change to func MakeFile in ./sanity_test.go
func (m *NodeMounter) MakeFile(pathname string) error {
f, err := os.OpenFile(pathname, os.O_CREATE, os.FileMode(0644))
if err != nil {
Expand All @@ -66,6 +68,8 @@ func (m *NodeMounter) MakeFile(pathname string) error {
return nil
}

// This function is mirrored in ./sanity_test.go to make sure sanity test covered this block of code
// Please mirror the change to func MakeFile in ./sanity_test.go
func (m *NodeMounter) MakeDir(pathname string) error {
err := os.MkdirAll(pathname, os.FileMode(0755))
if err != nil {
Expand All @@ -76,6 +80,8 @@ func (m *NodeMounter) MakeDir(pathname string) error {
return nil
}

// This function is mirrored in ./sanity_test.go to make sure sanity test covered this block of code
// Please mirror the change to func MakeFile in ./sanity_test.go
func (m *NodeMounter) ExistsPath(filename string) (bool, error) {
if _, err := os.Stat(filename); os.IsNotExist(err) {
return false, nil
Expand Down
90 changes: 89 additions & 1 deletion pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package driver
import (
"context"
"fmt"
"golang.org/x/sys/unix"
"k8s.io/kubernetes/pkg/volume"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"

csi "github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -65,6 +68,7 @@ var (
nodeCaps = []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
}
)

Expand Down Expand Up @@ -344,7 +348,66 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}

func (d *nodeService) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "NodeGetVolumeStats is not implemented yet")
klog.V(4).Infof("NodeGetVolumeStats: called with args %+v", *req)
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")
}
if len(req.VolumePath) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty")
}

exists, err := d.mounter.ExistsPath(req.VolumePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "unknown error when stat on %s: %v", req.VolumePath, err)
}
if !exists {
return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath)
}

isBlock, err := d.IsBlockDevice(req.VolumePath)

if err != nil {
return nil, status.Errorf(codes.Internal, "failed to determine whether %s is block device: %v", req.VolumePath, err)
}
if isBlock {
bcap, err := d.getBlockSizeBytes(req.VolumePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get block capacity on path %s: %v", req.VolumePath, err)
}
return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Unit: csi.VolumeUsage_BYTES,
Total: bcap,
},
},
}, nil
}

metricsProvider := volume.NewMetricsStatFS(req.VolumePath)

metrics, err := metricsProvider.GetMetrics()
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get fs info on path %s: %v", req.VolumePath, err)
}

return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Unit: csi.VolumeUsage_BYTES,
Available: metrics.Available.AsDec().UnscaledBig().Int64(),
Total: metrics.Capacity.AsDec().UnscaledBig().Int64(),
Used: metrics.Used.AsDec().UnscaledBig().Int64(),
},
{
Unit: csi.VolumeUsage_INODES,
Available: metrics.InodesFree.AsDec().UnscaledBig().Int64(),
Total: metrics.Inodes.AsDec().UnscaledBig().Int64(),
Used: metrics.InodesUsed.AsDec().UnscaledBig().Int64(),
},
},
}, nil

}

func (d *nodeService) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
Expand Down Expand Up @@ -550,3 +613,28 @@ func hasMountOption(options []string, opt string) bool {
}
return false
}

// IsBlock checks if the given path is a block device
func (d *nodeService) IsBlockDevice(fullPath string) (bool, error) {
var st unix.Stat_t
err := unix.Stat(fullPath, &st)
if err != nil {
return false, err
}

return (st.Mode & unix.S_IFMT) == unix.S_IFBLK, nil
}

func (d *nodeService) getBlockSizeBytes(devicePath string) (int64, error) {
cmd := d.mounter.Command("blockdev", "--getsize64", devicePath)
output, err := cmd.Output()
if err != nil {
return -1, fmt.Errorf("error when getting size of block volume at path %s: output: %s, err: %v", devicePath, string(output), err)
}
strOut := strings.TrimSpace(string(output))
gotSizeBytes, err := strconv.ParseInt(strOut, 10, 64)
if err != nil {
return -1, fmt.Errorf("failed to parse size %s as int", strOut)
}
return gotSizeBytes, nil
}
168 changes: 148 additions & 20 deletions pkg/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package driver
import (
"context"
"errors"
"os"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -1162,6 +1163,31 @@ func TestNodeUnpublishVolume(t *testing.T) {
expectErr(t, err, codes.InvalidArgument)
},
},
{
name: "fail error on unmount",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockMetadata := mocks.NewMockMetadataService(mockCtl)
mockMounter := mocks.NewMockMounter(mockCtl)

awsDriver := &nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}

req := &csi.NodeUnpublishVolumeRequest{
TargetPath: targetPath,
VolumeId: "vol-test",
}

mockMounter.EXPECT().Unmount(gomock.Eq(targetPath)).Return(errors.New("test Unmount error"))
_, err := awsDriver.NodeUnpublishVolume(context.TODO(), req)
expectErr(t, err, codes.Internal)
},
},
}

for _, tc := range testCases {
Expand All @@ -1170,32 +1196,127 @@ func TestNodeUnpublishVolume(t *testing.T) {
}

func TestNodeGetVolumeStats(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
testCases := []struct {
name string
testFunc func(t *testing.T)
}{
{
name: "success normal",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockMetadata := mocks.NewMockMetadataService(mockCtl)
mockMounter := mocks.NewMockMounter(mockCtl)
mockMetadata := mocks.NewMockMetadataService(mockCtl)
mockMounter := mocks.NewMockMounter(mockCtl)
VolumePath := "./test"
err := os.MkdirAll(VolumePath, 0644)
if err != nil {
t.Fatalf("fail to create dir: %v", err)
}
defer os.RemoveAll(VolumePath)

awsDriver := nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}
mockMounter.EXPECT().ExistsPath(VolumePath).Return(true, nil)

awsDriver := nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}

expErrCode := codes.Unimplemented
req := &csi.NodeGetVolumeStatsRequest{
VolumeId: "vol-test",
VolumePath: VolumePath,
}
_, err = awsDriver.NodeGetVolumeStats(context.TODO(), req)
if err != nil {
t.Fatalf("Expect no error but got: %v", err)
}
},
},
{
name: "fail path not exist",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

req := &csi.NodeGetVolumeStatsRequest{}
_, err := awsDriver.NodeGetVolumeStats(context.TODO(), req)
if err == nil {
t.Fatalf("Expected error code %d, got nil", expErrCode)
}
srvErr, ok := status.FromError(err)
if !ok {
t.Fatalf("Could not get error status code from error: %v", srvErr)
mockMetadata := mocks.NewMockMetadataService(mockCtl)
mockMounter := mocks.NewMockMounter(mockCtl)
VolumePath := "/test"

mockMounter.EXPECT().ExistsPath(VolumePath).Return(false, nil)

awsDriver := nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}

req := &csi.NodeGetVolumeStatsRequest{
VolumeId: "vol-test",
VolumePath: VolumePath,
}
_, err := awsDriver.NodeGetVolumeStats(context.TODO(), req)
expectErr(t, err, codes.NotFound)
},
},
{
name: "fail can't determine block device",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockMetadata := mocks.NewMockMetadataService(mockCtl)
mockMounter := mocks.NewMockMounter(mockCtl)
VolumePath := "/test"

mockMounter.EXPECT().ExistsPath(VolumePath).Return(true, nil)

awsDriver := nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}

req := &csi.NodeGetVolumeStatsRequest{
VolumeId: "vol-test",
VolumePath: VolumePath,
}
_, err := awsDriver.NodeGetVolumeStats(context.TODO(), req)
expectErr(t, err, codes.Internal)
},
},
{
name: "fail error calling existsPath",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockMetadata := mocks.NewMockMetadataService(mockCtl)
mockMounter := mocks.NewMockMounter(mockCtl)
VolumePath := "/test"

mockMounter.EXPECT().ExistsPath(VolumePath).Return(false, errors.New("get existsPath call fail"))

awsDriver := nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}

req := &csi.NodeGetVolumeStatsRequest{
VolumeId: "vol-test",
VolumePath: VolumePath,
}
_, err := awsDriver.NodeGetVolumeStats(context.TODO(), req)
expectErr(t, err, codes.Internal)
},
},
}
if srvErr.Code() != expErrCode {
t.Fatalf("Expected error code %d, got %d message %s", expErrCode, srvErr.Code(), srvErr.Message())

for _, tc := range testCases {
t.Run(tc.name, tc.testFunc)
}

}

func TestNodeGetCapabilities(t *testing.T) {
Expand Down Expand Up @@ -1226,6 +1347,13 @@ func TestNodeGetCapabilities(t *testing.T) {
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
},
},
},
}
expResp := &csi.NodeGetCapabilitiesResponse{Capabilities: caps}

Expand Down
Loading

0 comments on commit f7ec047

Please sign in to comment.