Skip to content

Commit

Permalink
add volume metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
AndyXiangLi committed Jan 7, 2021
1 parent e0b0ffc commit 13bf1a3
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 25 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/onsi/ginkgo v1.10.2
github.com/onsi/gomega v1.7.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
google.golang.org/grpc v1.26.0
k8s.io/api v0.17.3
k8s.io/apimachinery v0.17.3
Expand Down
68 changes: 68 additions & 0 deletions pkg/driver/mocks/mock_statter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/driver/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (m *NodeMounter) GetDeviceName(mountPath string) (string, int, error) {
return mount.GetDeviceNameFromMount(m, mountPath)
}

// This function is mirrored in ./sanity_test.go to make sure sanity test covered this block of code
// Please mirror the change to func MakeFile in ./sanity_test.go
func (m *NodeMounter) MakeFile(pathname string) error {
f, err := os.OpenFile(pathname, os.O_CREATE, os.FileMode(0644))
if err != nil {
Expand All @@ -66,6 +68,8 @@ func (m *NodeMounter) MakeFile(pathname string) error {
return nil
}

// This function is mirrored in ./sanity_test.go to make sure sanity test covered this block of code
// Please mirror the change to func MakeFile in ./sanity_test.go
func (m *NodeMounter) MakeDir(pathname string) error {
err := os.MkdirAll(pathname, os.FileMode(0755))
if err != nil {
Expand All @@ -76,6 +80,8 @@ func (m *NodeMounter) MakeDir(pathname string) error {
return nil
}

// This function is mirrored in ./sanity_test.go to make sure sanity test covered this block of code
// Please mirror the change to func MakeFile in ./sanity_test.go
func (m *NodeMounter) ExistsPath(filename string) (bool, error) {
if _, err := os.Stat(filename); os.IsNotExist(err) {
return false, nil
Expand Down
78 changes: 77 additions & 1 deletion pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"regexp"
"strconv"
"strings"

csi "github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -65,13 +66,15 @@ var (
nodeCaps = []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
}
)

// nodeService represents the node service of CSI driver
type nodeService struct {
metadata cloud.MetadataService
mounter Mounter
statter Statter
inFlight *internal.InFlight
driverOptions *DriverOptions
}
Expand All @@ -87,6 +90,7 @@ func newNodeService(driverOptions *DriverOptions) nodeService {
return nodeService{
metadata: metadata,
mounter: newNodeMounter(),
statter: NewStatter(),
inFlight: internal.NewInFlight(),
driverOptions: driverOptions,
}
Expand Down Expand Up @@ -344,7 +348,65 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}

func (d *nodeService) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "NodeGetVolumeStats is not implemented yet")
klog.V(4).Infof("NodeGetVolumeStats: called with args %+v", *req)
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")
}
if len(req.VolumePath) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty")
}

exists, err := d.mounter.ExistsPath(req.VolumePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "unknown error when stat on %s: %v", req.VolumePath, err)
}
if !exists {
return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath)
}

isBlock, err := d.statter.IsBlockDevice(req.VolumePath)

if err != nil {
return nil, status.Errorf(codes.Internal, "failed to determine whether %s is block device: %v", req.VolumePath, err)
}
if isBlock {
bcap, err := d.getBlockSizeBytes(req.VolumePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get block capacity on path %s: %v", req.VolumePath, err)
}
return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Unit: csi.VolumeUsage_BYTES,
Total: bcap,
},
},
}, nil
}

available, capacity, used, inodesFree, inodes, inodesUsed, err := d.statter.StatFS(req.VolumePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get fs info on path %s: %v", req.VolumePath, err)
}

return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Available: available,
Total: capacity,
Used: used,
Unit: csi.VolumeUsage_BYTES,
},

{
Available: inodesFree,
Total: inodes,
Used: inodesUsed,
Unit: csi.VolumeUsage_INODES,
},
},
}, nil

}

func (d *nodeService) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
Expand Down Expand Up @@ -550,3 +612,17 @@ func hasMountOption(options []string, opt string) bool {
}
return false
}

func (d *nodeService) getBlockSizeBytes(devicePath string) (int64, error) {
cmd := d.mounter.Command("blockdev", "--getsize64", devicePath)
output, err := cmd.Output()
if err != nil {
return -1, fmt.Errorf("error when getting size of block volume at path %s: output: %s, err: %v", devicePath, string(output), err)
}
strOut := strings.TrimSpace(string(output))
gotSizeBytes, err := strconv.ParseInt(strOut, 10, 64)
if err != nil {
return -1, fmt.Errorf("failed to parse size %s as int", strOut)
}
return gotSizeBytes, nil
}
64 changes: 44 additions & 20 deletions pkg/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,32 +1170,49 @@ func TestNodeUnpublishVolume(t *testing.T) {
}

func TestNodeGetVolumeStats(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
testCases := []struct {
name string
testFunc func(t *testing.T)
}{
{
name: "success normal",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockMetadata := mocks.NewMockMetadataService(mockCtl)
mockMounter := mocks.NewMockMounter(mockCtl)
mockMetadata := mocks.NewMockMetadataService(mockCtl)
mockMounter := mocks.NewMockMounter(mockCtl)
mockStatter := mocks.NewMockStatter(mockCtl)
VolumePath := "/test"
one := int64(1)

awsDriver := nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}
mockMounter.EXPECT().ExistsPath(VolumePath).Return(true, nil)
mockStatter.EXPECT().IsBlockDevice(VolumePath).Return(false, nil)
mockStatter.EXPECT().StatFS(VolumePath).Return(one, one, one, one, one, one, nil)

expErrCode := codes.Unimplemented
awsDriver := nodeService{
metadata: mockMetadata,
mounter: mockMounter,
statter: mockStatter,
inFlight: internal.NewInFlight(),
}

req := &csi.NodeGetVolumeStatsRequest{}
_, err := awsDriver.NodeGetVolumeStats(context.TODO(), req)
if err == nil {
t.Fatalf("Expected error code %d, got nil", expErrCode)
}
srvErr, ok := status.FromError(err)
if !ok {
t.Fatalf("Could not get error status code from error: %v", srvErr)
req := &csi.NodeGetVolumeStatsRequest{
VolumeId: "vol-test",
VolumePath: VolumePath,
}
_, err := awsDriver.NodeGetVolumeStats(context.TODO(), req)
if err != nil {
t.Fatalf("Expected no error but got err %v", err)
}
},
},
}
if srvErr.Code() != expErrCode {
t.Fatalf("Expected error code %d, got %d message %s", expErrCode, srvErr.Code(), srvErr.Message())

for _, tc := range testCases {
t.Run(tc.name, tc.testFunc)
}

}

func TestNodeGetCapabilities(t *testing.T) {
Expand Down Expand Up @@ -1226,6 +1243,13 @@ func TestNodeGetCapabilities(t *testing.T) {
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
},
},
},
}
expResp := &csi.NodeGetCapabilitiesResponse{Capabilities: caps}

Expand Down
56 changes: 52 additions & 4 deletions pkg/driver/sanity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ func TestSanity(t *testing.T) {
}
defer os.RemoveAll(dir)

targetPath := filepath.Join(dir, "target")
targetPath := filepath.Join(dir, "mount")
stagingPath := filepath.Join(dir, "staging")
endpoint := "unix://" + filepath.Join(dir, "csi.sock")

config := &sanity.Config{
TargetPath: targetPath,
StagingPath: stagingPath,
Address: endpoint,
TargetPath: targetPath,
StagingPath: stagingPath,
Address: endpoint,
CreateTargetDir: createDir,
CreateStagingDir: createDir,
}

driverOptions := &DriverOptions{
Expand All @@ -54,6 +56,7 @@ func TestSanity(t *testing.T) {
AvailabilityZone: "az",
},
mounter: newFakeMounter(),
statter: NewFakeStatter(),
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
},
Expand All @@ -73,6 +76,15 @@ func TestSanity(t *testing.T) {
sanity.Test(t, config)
}

func createDir(targetPath string) (string, error) {
if err := os.MkdirAll(targetPath, 0300); err != nil {
if os.IsNotExist(err) {
return "", err
}
}
return targetPath, nil
}

type fakeCloudProvider struct {
disks map[string]*fakeDisk
// snapshots contains mapping from snapshot ID to snapshot
Expand Down Expand Up @@ -309,13 +321,49 @@ func (f *fakeMounter) GetDeviceName(mountPath string) (string, int, error) {
}

func (f *fakeMounter) MakeFile(pathname string) error {
file, err := os.OpenFile(pathname, os.O_CREATE, os.FileMode(0644))
if err != nil {
if !os.IsExist(err) {
return err
}
}
if err = file.Close(); err != nil {
return err
}
return nil
}

func (f *fakeMounter) MakeDir(pathname string) error {
err := os.MkdirAll(pathname, os.FileMode(0755))
if err != nil {
if !os.IsExist(err) {
return err
}
}
return nil
}

func (f *fakeMounter) ExistsPath(filename string) (bool, error) {
if _, err := os.Stat(filename); os.IsNotExist(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}

type fakeStatter struct{}

func NewFakeStatter() fakeStatter {
return fakeStatter{}
}

func (fakeStatter) StatFS(path string) (available, capacity, used, inodesFree, inodes, inodesUsed int64, err error) {
// Assume the file exists and give some dummy values back
one := int64(1)
return one, one, one, one, one, one, nil
}

func (fakeStatter) IsBlockDevice(fullPath string) (bool, error) {
return false, nil
}
Loading

0 comments on commit 13bf1a3

Please sign in to comment.