Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support external-health-monitor #210

Merged
merged 1 commit into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ LABEL description="HostPath Driver"
ARG binary=./bin/hostpathplugin

# Add util-linux to get a new version of losetup.
RUN apk add util-linux
RUN apk add util-linux && apk update && apk upgrade
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this related to "support external-health-monitor"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I implement this feature, I found I need run findmnt command with jsonargument. It was supported in newer version. So, I update it

COPY ${binary} /hostpathplugin
ENTRYPOINT ["/hostpathplugin"]
7 changes: 6 additions & 1 deletion cmd/hostpathplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,10 @@ func handle() {
fmt.Printf("Failed to initialize driver: %s", err.Error())
os.Exit(1)
}
driver.Run()

if err := driver.Run(); err != nil {
fmt.Printf("Failed to run driver: %s", err.Error())
os.Exit(1)

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here? This looks like an unrelated enhancement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An error may be returned from that function, just checking it right?

}
30 changes: 30 additions & 0 deletions deploy/kubernetes-1.17/hostpath/csi-hostpath-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,37 @@ spec:
labels:
app: csi-hostpathplugin
spec:
serviceAccount: csi-external-health-monitor-controller
containers:
- name: csi-external-health-monitor-agent
image: k8s.gcr.io/sig-storage/csi-external-health-monitor-agent:v0.2.0
args:
- "--v=5"
- "--csi-address=$(ADDRESS)"
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: ADDRESS
value: /csi/csi.sock
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: socket-dir
mountPath: /csi
- name: csi-external-health-monitor-controller
image: k8s.gcr.io/sig-storage/csi-external-health-monitor-controller:v0.2.0
args:
- "--v=5"
- "--csi-address=$(ADDRESS)"
- "--leader-election"
env:
- name: ADDRESS
value: /csi/csi.sock
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: socket-dir
mountPath: /csi
- name: node-driver-registrar
image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.0.1
args:
Expand Down
5 changes: 4 additions & 1 deletion deploy/util/deploy-hostpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ CSI_SNAPSHOTTER_RBAC_YAML="https://raw.githubusercontent.com/kubernetes-csi/exte
CSI_RESIZER_RBAC_YAML="https://raw.githubusercontent.com/kubernetes-csi/external-resizer/$(rbac_version "${BASE_DIR}/hostpath/csi-hostpath-resizer.yaml" csi-resizer false)/deploy/kubernetes/rbac.yaml"
: ${CSI_RESIZER_RBAC:=https://raw.githubusercontent.com/kubernetes-csi/external-resizer/$(rbac_version "${BASE_DIR}/hostpath/csi-hostpath-resizer.yaml" csi-resizer "${UPDATE_RBAC_RULES}")/deploy/kubernetes/rbac.yaml}

CSI_EXTERNALHEALTH_MONITOR_RBAC_YAML="https://raw.githubusercontent.com/kubernetes-csi/external-health-monitor/$(rbac_version "${BASE_DIR}/hostpath/csi-hostpath-plugin.yaml" csi-external-health-monitor-controller false)/deploy/kubernetes/external-health-monitor-controller/rbac.yaml"
: ${CSI_EXTERNALHEALTH_MONITOR_RBAC:=https://raw.githubusercontent.com/kubernetes-csi/external-health-monitor/$(rbac_version "${BASE_DIR}/hostpath/csi-hostpath-plugin.yaml" csi-external-health-monitor-controller "${UPDATE_RBAC_RULES}")/deploy/kubernetes/external-health-monitor-controller/rbac.yaml}

INSTALL_CRD=${INSTALL_CRD:-"false"}

# Some images are not affected by *_REGISTRY/*_TAG and IMAGE_* variables.
Expand All @@ -140,7 +143,7 @@ run () {

# rbac rules
echo "applying RBAC rules"
for component in CSI_PROVISIONER CSI_ATTACHER CSI_SNAPSHOTTER CSI_RESIZER; do
for component in CSI_PROVISIONER CSI_ATTACHER CSI_SNAPSHOTTER CSI_RESIZER CSI_EXTERNALHEALTH_MONITOR; do
eval current="\${${component}_RBAC}"
eval original="\${${component}_RBAC_YAML}"
if [ "$current" != "$original" ]; then
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/golang/protobuf v1.4.3
github.com/kubernetes-csi/csi-lib-utils v0.9.0
github.com/pborman/uuid v1.2.1
github.com/stretchr/testify v1.6.1
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11
golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d // indirect
google.golang.org/genproto v0.0.0-20201209185603-f92720507ed4 // indirect
Expand Down
28 changes: 11 additions & 17 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion hack/get-sanity.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/sh

VERSION="v1.0.0-rc2"
VERSION="v4.0.2"
SANITYTGZ="csi-sanity-${VERSION}.linux.amd64.tar.gz"

echo "Downloading csi-test from https://github.com/kubernetes-csi/csi-test/releases/download/${VERSION}/${SANITYTGZ}"
Expand Down
77 changes: 72 additions & 5 deletions pkg/hostpath/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@ func NewControllerServer(ephemeral bool, nodeID string) *controllerServer {
caps: getControllerServiceCapabilities(
[]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_GET_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_VOLUME_CONDITION,
}),
nodeID: nodeID,
}
Expand Down Expand Up @@ -279,7 +282,75 @@ func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacit
}

func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
volumeRes := &csi.ListVolumesResponse{
Entries: []*csi.ListVolumesResponse_Entry{},
}

var (
startIdx, volumesLength, maxLength int64
hpVolume hostPathVolume
)
volumeIds := getSortedVolumeIDs()
if req.StartingToken == "" {
req.StartingToken = "1"
}

startIdx, err := strconv.ParseInt(req.StartingToken, 10, 32)
if err != nil {
return nil, status.Error(codes.Aborted, "The type of startingToken should be integer")
}

volumesLength = int64(len(volumeIds))
maxLength = int64(req.MaxEntries)

if maxLength > volumesLength || maxLength <= 0 {
maxLength = volumesLength
}

for index := startIdx - 1; index < volumesLength && index < maxLength; index++ {
hpVolume = hostPathVolumes[volumeIds[index]]
healthy, msg := doHealthCheckInControllerSide(volumeIds[index])
glog.V(3).Infof("Healthy state: %s Volume: %t", hpVolume.VolName, healthy)
volumeRes.Entries = append(volumeRes.Entries, &csi.ListVolumesResponse_Entry{
Volume: &csi.Volume{
VolumeId: hpVolume.VolID,
CapacityBytes: hpVolume.VolSize,
},
Status: &csi.ListVolumesResponse_VolumeStatus{
PublishedNodeIds: []string{hpVolume.NodeID},
VolumeCondition: &csi.VolumeCondition{
Abnormal: !healthy,
Message: msg,
},
},
})
}

glog.V(5).Infof("Volumes are: %+v", *volumeRes)
return volumeRes, nil
}

func (cs *controllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
volume, ok := hostPathVolumes[req.GetVolumeId()]
if !ok {
return nil, status.Error(codes.NotFound, "The volume not found")
}

healthy, msg := doHealthCheckInControllerSide(req.GetVolumeId())
glog.V(3).Infof("Healthy state: %s Volume: %t", volume.VolName, healthy)
return &csi.ControllerGetVolumeResponse{
Volume: &csi.Volume{
VolumeId: volume.VolID,
CapacityBytes: volume.VolSize,
},
Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
PublishedNodeIds: []string{volume.NodeID},
VolumeCondition: &csi.VolumeCondition{
Abnormal: !healthy,
Message: msg,
},
},
}, nil
}

// getSnapshotPath returns the full path to where the snapshot is stored
Expand Down Expand Up @@ -527,10 +598,6 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi
}, nil
}

func (cs *controllerServer) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func convertSnapshot(snap hostPathSnapshot) *csi.ListSnapshotsResponse {
entries := []*csi.ListSnapshotsResponse_Entry{
{
Expand Down
202 changes: 202 additions & 0 deletions pkg/hostpath/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package hostpath

import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"

"github.com/golang/glog"
fs "k8s.io/kubernetes/pkg/volume/util/fs"
)

const (
podVolumeTargetPath = "/var/lib/kubelet/pods"
csiSignOfVolumeTargetPath = "kubernetes.io~csi/pvc"
)

type MountPointInfo struct {
Target string `json:"target"`
Source string `json:"source"`
FsType string `json:"fstype"`
Options string `json:"options"`
ContainerFileSystem []MountPointInfo `json:"children,omitempty"`
}

type ContainerFileSystem struct {
Children []MountPointInfo `json:"children"`
}

type FileSystems struct {
Filsystem []ContainerFileSystem `json:"filesystems"`
}

func locateCommandPath(commandName string) string {
// default to root
binary := filepath.Join("/", commandName)
for _, path := range []string{"/bin", "/usr/sbin", "/usr/bin"} {
binPath := filepath.Join(path, binary)
if _, err := os.Stat(binPath); err != nil {
continue
}

return binPath
}

return ""
}

func getSourcePath(volumeHandle string) string {
return fmt.Sprintf("%s/%s", dataRoot, volumeHandle)
}

func checkSourcePathExist(volumeHandle string) (bool, error) {
sourcePath := getSourcePath(volumeHandle)
glog.V(3).Infof("Volume: %s Source path is: %s", volumeHandle, sourcePath)
_, err := os.Stat(sourcePath)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}

return false, err
}

return true, nil
}

func parseMountInfo(originalMountInfo []byte) ([]MountPointInfo, error) {
fs := FileSystems{
Filsystem: make([]ContainerFileSystem, 0),
}

if err := json.Unmarshal(originalMountInfo, &fs); err != nil {
return nil, err
}

if len(fs.Filsystem) <= 0 {
return nil, fmt.Errorf("failed to get mount info")
}

return fs.Filsystem[0].Children, nil
}

func checkMountPointExist(sourcePath string) (bool, error) {
cmdPath := locateCommandPath("findmnt")
out, err := exec.Command(cmdPath, "--json").CombinedOutput()
if err != nil {
glog.V(3).Infof("failed to execute command: %+v", cmdPath)
return false, err
}

if len(out) < 1 {
return false, fmt.Errorf("mount point info is nil")
}

mountInfos, err := parseMountInfo([]byte(out))
if err != nil {
return false, fmt.Errorf("failed to parse the mount infos: %+v", err)
}

mountInfosOfPod := MountPointInfo{}
for _, mountInfo := range mountInfos {
if mountInfo.Target == podVolumeTargetPath {
mountInfosOfPod = mountInfo
break
}
}

for _, mountInfo := range mountInfosOfPod.ContainerFileSystem {
if !strings.Contains(mountInfo.Source, sourcePath) {
continue
}

_, err = os.Stat(mountInfo.Target)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}

return false, err
}

return true, nil
}

return false, nil
}

func checkPVCapacityValid(volumeHandle string) (bool, error) {
sourcePath := getSourcePath(volumeHandle)
_, fscapacity, _, _, _, _, err := fs.FsInfo(sourcePath)
if err != nil {
return false, fmt.Errorf("failed to get capacity info: %+v", err)
}

volumeCapacity := hostPathVolumes[volumeHandle].VolSize
glog.V(3).Infof("volume capacity: %+v fs capacity:%+v", volumeCapacity, fscapacity)
return fscapacity >= volumeCapacity, nil
}

func getPVCapacity(volumeHandle string) (int64, int64, int64, error) {
sourcePath := getSourcePath(volumeHandle)
fsavailable, fscapacity, fsused, _, _, _, err := fs.FsInfo(sourcePath)
return fscapacity, fsused, fsavailable, err
}

func checkPVUsage(volumeHandle string) (bool, error) {
sourcePath := getSourcePath(volumeHandle)
fsavailable, _, _, _, _, _, err := fs.FsInfo(sourcePath)
if err != nil {
return false, err
}

glog.V(3).Infof("fs available: %+v", fsavailable)
return fsavailable > 0, nil
}

func doHealthCheckInControllerSide(volumeHandle string) (bool, string) {
spExist, err := checkSourcePathExist(volumeHandle)
if err != nil {
return false, err.Error()
}

if !spExist {
return false, "The source path of the volume doesn't exist"
}

capValid, err := checkPVCapacityValid(volumeHandle)
if err != nil {
return false, err.Error()
}

if !capValid {
return false, "The capacity of volume is greater than actual storage"
}

available, err := checkPVUsage(volumeHandle)
if err != nil {
return false, err.Error()
}

if !available {
return false, "The free space of the volume is insufficient"
}

return true, ""
}

func doHealthCheckInNodeSide(volumeHandle string) (bool, string) {
mpExist, err := checkMountPointExist(volumeHandle)
if err != nil {
return false, err.Error()
}

if !mpExist {
return false, "The volume isn't mounted"
}

return true, ""
}
Loading