Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CSI start AlluxioFuse process in separate pod #15221

Merged
merged 18 commits into from
Apr 12, 2022

Conversation

ssz1997
Copy link
Contributor

@ssz1997 ssz1997 commented Mar 29, 2022

What changes are proposed in this pull request?

Making CSI launch a separate pod running AlluxioFuse process, instead of launcing AlluxioFuse process in the CSI nodeserver container

Why are the changes needed?

If nodeserver container or node-plugin pod for any reason is down, we lose Alluxio Fuse process and it's very cumbersome to bring it back. With a separate Fuse pod, CSI pod won't affect Fuse process.

Solves #14917

Does this PR introduce any user facing changes?

  1. Removed javaOptions from csi section in values.yaml. Alluxio properties in helm chart should be organized in one place, not in properties and in csi.
  2. Add property mountInPod in csi section. If set to true, Fuse process is launched in the separate pod.

@ssz1997
Copy link
Contributor Author

ssz1997 commented Mar 31, 2022

@Binyang2014 Feel free to review

Copy link
Contributor

@jiacheliu3 jiacheliu3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I know nothing about CSI as of now, so I could only review the helm chart part :(

Comment on lines 662 to 666
cpu: 4
memory: 8G
requests:
cpu: "1"
memory: "1G"
cpu: 10m
memory: 300Mi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is a FUSE daemon which is a JVM? Will a low request and high limit cause the JVM heap resize? Would it be better to allocate more resources at first? Did you get this 300Mi from some test or estimation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little complicated. Should've commented beforehand. We now support two modes. One is Fuse daemon is in this container, which requires more resources. The other mode is Fuse daemon is in another pod. In this case this container requires much less resources. The mode depends on the property mountInPod, so I'm not sure what's the best way to allocate resources

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's modal based on mountInPod I'd stick with the default being the greedy request, and then conditionally use some lower value in the template file (i.e: if .Values.csi.mountInPod).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes a lot of sense. Thanks

Copy link
Contributor Author

@ssz1997 ssz1997 Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually let's just keep the resources as they were. If there's no fuse process in the container, 1 cpu and 1G memory is probably excessive but I don't think it's gonna hurt

# for csi client
clientEnabled: false
accessModes:
- ReadWriteMany
- ReadWriteOnce
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any implications of changing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our csi controller actually only supports ReadWriteOnce. It will error out if the accessMode is any other.

for _, cap := range req.VolumeCapabilities {
if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil
}
}

Copy link
Contributor

@ZhuTopher ZhuTopher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to provide much insight for the CSI changes, k8s library stuff looks fine to me. Some minor style nits here and there. Also remember to update the Helm Chart.md and CHANGELOG.md, thanks!

@@ -49,6 +53,7 @@ func (d *driver) newNodeServer() *nodeServer {
return &nodeServer{
nodeId: d.nodeId,
DefaultNodeServer: csicommon.NewDefaultNodeServer(d.csiDriver),
client: d.client,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: misaligned whitespace

Comment on lines 15 to 21
"fmt"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
"io/ioutil"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: idr what our import style conventions were, but I'd prefer to keep the Golang native libraries in a separate block from packages like github.com or k8s.io

csiFuseObj, grpVerKind, err := scheme.Codecs.UniversalDeserializer().Decode(csiFuseYaml, nil, nil)
if err != nil {
glog.V(4).Info("Failed to decode csi-fuse config yaml file")
return nil, status.Errorf(codes.NotFound, "Failed to decode csi-fuse config yaml file.\n", err.Error())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a more fitting error code than codes.NotFound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to codes.Internal. Indeed this is not a NotFound.

Comment on lines 662 to 666
cpu: 4
memory: 8G
requests:
cpu: "1"
memory: "1G"
cpu: 10m
memory: 300Mi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's modal based on mountInPod I'd stick with the default being the greedy request, and then conditionally use some lower value in the template file (i.e: if .Values.csi.mountInPod).

@ssz1997
Copy link
Contributor Author

ssz1997 commented Apr 1, 2022

@jiacheliu3 @ZhuTopher PTAL. Thanks!

Copy link
Contributor

@jiacheliu3 jiacheliu3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the helm chart parts LGTM except one small nit, thanks!

kind: Pod
apiVersion: v1
metadata:
name: {{ $fullName }}-fuse-
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
name: {{ $fullName }}-fuse-
name: {{ $fullName }}-fuse

Copy link
Contributor

@Binyang2014 Binyang2014 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change. If two jobs use the same volume, then these two jobs will share the same fuse daemon. This will break the isolation we assumed before. And if Job A use I/O heavily, due to the thread number limitation, Job B will be impacted. Even more, if Job A cause fuse daemon crashed, Job B will also crashed. The idea solution is job A and job B use the different fuse daemons and job submitter can config the fuse daemon resource limitation based on their requirements.

return nodePublishVolumeMountPod(req)
}

func nodePublishVolumeMountProcess(req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it as a member function of nodeServer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we do it? We are not using the nodeServer in it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can ignore this if not make sense to you

if err != nil {
return nil, err
}
if _, err := ns.client.CoreV1().Pods("default").Create(fusePod); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should in the same namespace with nodeserver, not default namespace

if nodeId == "" {
return nil, status.Errorf(codes.InvalidArgument, "nodeID is missing in the csi setup.\n%v", err.Error())
}
csiFusePodObj.Name = csiFusePodObj.Name + nodeId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if multi-pod mount the different volume in the same node? Will they use the same pod name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thank you.


0.6.41

- Remove javaOptions under CSI
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove javaOptions?

Copy link
Contributor Author

@ssz1997 ssz1997 Apr 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be there. Adding it back.

@@ -94,3 +102,19 @@ func startReaper() {
}
}()
}

func startKubeClient() (*kubernetes.Clientset, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to newKubeClient?

Comment on lines 121 to 135
i := 0
for i < 10 {
time.Sleep(3 * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", stagingPath))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln("Alluxio is not mounted.")
}
if len(stdout) > 0 {
break
}
i++
}
if i == 10 {
glog.V(3).Infoln("alluxio-fuse is not mounted to global mount point in 30s.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to NodeStageVolume? After NodeStageVolume we should make sure the volume already ready in this node

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes a lot of sense. Thanks

@ssz1997
Copy link
Contributor Author

ssz1997 commented Apr 4, 2022

After this change. If two jobs use the same volume, then these two jobs will share the same fuse daemon. This will break the isolation we assumed before. And if Job A use I/O heavily, due to the thread number limitation, Job B will be impacted. Even more, if Job A cause fuse daemon crashed, Job B will also crashed. The idea solution is job A and job B use the different fuse daemons and job submitter can config the fuse daemon resource limitation based on their requirements.

@Binyang2014 Thank you so much for your review. I think these are good points.

However I'm wondering, if job A and B are different workloads (one I/O heavy and one not), does it make more sense to use different pv/pvc, so that they will launch separate Fuse pods and thus using different Fuse processes?

Do you prefer to just getting rid of the current way of launching Fuse processes in the nodeserver container, and launching them in different pods instead?

@Binyang2014
Copy link
Contributor

After this change. If two jobs use the same volume, then these two jobs will share the same fuse daemon. This will break the isolation we assumed before. And if Job A use I/O heavily, due to the thread number limitation, Job B will be impacted. Even more, if Job A cause fuse daemon crashed, Job B will also crashed. The idea solution is job A and job B use the different fuse daemons and job submitter can config the fuse daemon resource limitation based on their requirements.

@Binyang2014 Thank you so much for your review. I think these are good points.

However I'm wondering, if job A and B are different workloads (one I/O heavy and one not), does it make more sense to use different pv/pvc, so that they will launch separate Fuse pods and thus using different Fuse processes?

Do you prefer to just getting rid of the current way of launching Fuse processes in the nodeserver container, and launching them in different pods instead?

For AI scenario, most of jobs sharing a few well-known datasets such as ImageNet. Users may tune different model based on the dataset. So group/cluster admin may create one PV/PVC and all these jobs attached the same volume. Since we don't known the processing speed for each model. So maybe some model is processing faster than others. I agree admin can create different PV/PVC for different jobs, but this method seems not recommended by Kubernetes

For second question.
Yes, launching them in different pods will bring benefits for resource isolation and platform upgrade. There are two problems for current approach:

  1. isolation: we can not cap the cpu/memory usage for each fuse daemon, if some less import jobs consuming data too fast, it will impact other critical jobs
  2. Upgrading node-server will kill the fuse-daemon as well as the related jobs
    So from my point of view, after migrate to pod method, we need to solve above problems.

@ssz1997
Copy link
Contributor Author

ssz1997 commented Apr 7, 2022

@Binyang2014 Thanks for the clarifications.

For next step, we plan to have two modes for the pod method: 1. Jobs using the same pv/pvc share one fuse daemon; 2. Each job always has its own fuse daemon.

For the problem #2 you mentioned, I believe as long as the fuse process is in a different pod, nodeserver upgrading should not kill the fuse process. Then the second mode in which each job has its own fuse daemons, we may be able to pass in some cap to limit its resource consumption when fuse pod is started, which resolves the problem #1.

@@ -122,7 +126,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
glog.V(3).Infof("Invalid delete volume req: %v", req)
return nil, err
}
glog.V(4).Infof("Deleting volume %s", volumeID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do anything here. The log is misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the returned value not trigger a deletion? If so why is the return type not a CreateVolumeResponse?
return &csi.DeleteVolumeResponse{}, nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the returned value will trigger the deletion of the pv. However that is not happening inside this function, so logging should also not be here. Plus we are not removing any data stored in Alluxio

Copy link
Contributor

@ZhuTopher ZhuTopher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, thanks for all this work Shawn!

Comment on lines 173 to 205
if req.GetVolumeContext()["mountInPod"] == "true" {
ns.mutex.Lock()
defer ns.mutex.Unlock()

glog.V(4).Infoln("Creating Alluxio-fuse pod and mounting Alluxio to global mount point.")
fusePod, err := getAndCompleteFusePodObj(ns.nodeId, req)
if err != nil {
return nil, err
}
if _, err := ns.client.CoreV1().Pods(os.Getenv("NAMESPACE")).Create(fusePod); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to launch Fuse Pod at %v.\n%v", ns.nodeId, err.Error())
}
glog.V(4).Infoln("Successfully creating Fuse pod.")

// Wait for alluxio-fuse pod finishing mount to global mount point
i := 0
for i < 12 {
time.Sleep(5 * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln("Alluxio is not mounted yet.")
}
if len(stdout) > 0 {
break
}
i++
}
if i == 12 {
glog.V(3).Infoln("alluxio-fuse is not mounted to global mount point in 60s.")
return nil, status.Error(codes.DeadlineExceeded, "alluxio-fuse is not mounted to global mount point in 60s")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the timeout & retries configurable? I can imagine this being very varied.

Maybe we could define a readiness probe on the FUSE pod to indicate that it has finished mounting and wait on that through the K8s API, but for now the timeout is fine imo.

Comment on lines 188 to 205
i := 0
for i < 12 {
time.Sleep(5 * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln("Alluxio is not mounted yet.")
}
if len(stdout) > 0 {
break
}
i++
}
if i == 12 {
glog.V(3).Infoln("alluxio-fuse is not mounted to global mount point in 60s.")
return nil, status.Error(codes.DeadlineExceeded, "alluxio-fuse is not mounted to global mount point in 60s")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: Assume the exterior of the for-loop is the error case if you are waiting on a timeout with retries.

So at the top of the method you'd need:

if req.GetVolumeContext()["mountInPod"] == "false" {
    return &csi.NodeStageVolumeResponse{}, nil
}

And then this for-loop would look like the following:

Suggested change
i := 0
for i < 12 {
time.Sleep(5 * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln("Alluxio is not mounted yet.")
}
if len(stdout) > 0 {
break
}
i++
}
if i == 12 {
glog.V(3).Infoln("alluxio-fuse is not mounted to global mount point in 60s.")
return nil, status.Error(codes.DeadlineExceeded, "alluxio-fuse is not mounted to global mount point in 60s")
}
}
for i := 0; i < 12; i++ {
time.Sleep(5 * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln("Alluxio is not mounted yet.")
}
if len(stdout) > 0 {
return &csi.NodeStageVolumeResponse{}, nil
}
}
}
glog.V(3).Infoln("alluxio-fuse is not mounted to global mount point in 60s.")
return nil, status.Error(codes.DeadlineExceeded, "alluxio-fuse is not mounted to global mount point in 60s")

Comment on lines 72 to 75
privileged: true
capabilities:
add:
- SYS_ADMIN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be good practice to have in-line comments explaining why these are necessary. Same goes for any other CSI files where this is the case.

Comment on lines +191 to +209
retry, err := strconv.Atoi(os.Getenv("FAILURE_THRESHOLD"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert failure threshold %v to int.", os.Getenv("FAILURE_THRESHOLD"))
}
timeout, err := strconv.Atoi(os.Getenv("PERIOD_SECONDS"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert period seconds %v to int.", os.Getenv("PERIOD_SECONDS"))
}
for i:= 0; i < retry; i++ {
time.Sleep(time.Duration(timeout) * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(fmt.Sprintf("Alluxio is not mounted in %v seconds.", i * timeout))
}
if len(stdout) > 0 {
return &csi.NodeStageVolumeResponse{}, nil
}
}
Copy link
Contributor

@Binyang2014 Binyang2014 Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking if we can leverage Kubernetes retry policy. We can let this method retry error if fuse-daemon not ready. Then k8s will retry automatically. So we don't need to write this logic by our own.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be in the next step.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean seems we can simply write as flowing:

Suggested change
retry, err := strconv.Atoi(os.Getenv("FAILURE_THRESHOLD"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert failure threshold %v to int.", os.Getenv("FAILURE_THRESHOLD"))
}
timeout, err := strconv.Atoi(os.Getenv("PERIOD_SECONDS"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert period seconds %v to int.", os.Getenv("PERIOD_SECONDS"))
}
for i:= 0; i < retry; i++ {
time.Sleep(time.Duration(timeout) * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(fmt.Sprintf("Alluxio is not mounted in %v seconds.", i * timeout))
}
if len(stdout) > 0 {
return &csi.NodeStageVolumeResponse{}, nil
}
}
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(fmt.Sprintf("Alluxio mount point is not ready"))
return err
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean if fuse is not ready we just return the error, and let CSI recall this method again? But the later calls will first find out that the pod already exists and directly returns success and won't check the mount point again.
Am I interpreting it right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so we'd better use pod readiness probe. If the pod not ready, we return error directly then let CSI recall this method again, if it already ready return succeed. We should not rely on if pod existed to pass the check. Is it make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it makes sense. Just to clarify, here we are checking if Alluxio fuse has mounted Alluxio to mount point, not if the pod exists. I will work on the readiness probe soon.

if err != nil {
return nil, err
}
if _, err := ns.client.CoreV1().Pods(os.Getenv("NAMESPACE")).Create(fusePod); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the pod already crated by previous request? Make it idempotent?

@ssz1997
Copy link
Contributor Author

ssz1997 commented Apr 11, 2022

@Binyang2014 If you think the PR is good to go, please approve it. Thanks!

Copy link
Contributor

@Binyang2014 Binyang2014 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ssz1997 for this change

Comment on lines +191 to +209
retry, err := strconv.Atoi(os.Getenv("FAILURE_THRESHOLD"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert failure threshold %v to int.", os.Getenv("FAILURE_THRESHOLD"))
}
timeout, err := strconv.Atoi(os.Getenv("PERIOD_SECONDS"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert period seconds %v to int.", os.Getenv("PERIOD_SECONDS"))
}
for i:= 0; i < retry; i++ {
time.Sleep(time.Duration(timeout) * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(fmt.Sprintf("Alluxio is not mounted in %v seconds.", i * timeout))
}
if len(stdout) > 0 {
return &csi.NodeStageVolumeResponse{}, nil
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean seems we can simply write as flowing:

Suggested change
retry, err := strconv.Atoi(os.Getenv("FAILURE_THRESHOLD"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert failure threshold %v to int.", os.Getenv("FAILURE_THRESHOLD"))
}
timeout, err := strconv.Atoi(os.Getenv("PERIOD_SECONDS"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert period seconds %v to int.", os.Getenv("PERIOD_SECONDS"))
}
for i:= 0; i < retry; i++ {
time.Sleep(time.Duration(timeout) * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(fmt.Sprintf("Alluxio is not mounted in %v seconds.", i * timeout))
}
if len(stdout) > 0 {
return &csi.NodeStageVolumeResponse{}, nil
}
}
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(fmt.Sprintf("Alluxio mount point is not ready"))
return err
}

return nil, err
}
if _, err := ns.client.CoreV1().Pods(os.Getenv("NAMESPACE")).Create(fusePod); err != nil {
if strings.Contains(err.Error(), "already exists") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using http code 409 conflict for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

http code exists in the http result object, and when err is not nil, we won't return the http result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HelloHorizon
Copy link
Contributor

alluxio-bot, merge this please

@alluxio-bot alluxio-bot merged commit 7ed2f64 into Alluxio:master Apr 12, 2022
flaming-archer pushed a commit to flaming-archer/alluxio that referenced this pull request Sep 1, 2022
…e pod

Making CSI launch a separate pod running AlluxioFuse process, instead of
launcing AlluxioFuse process in the CSI nodeserver container

If nodeserver container or node-plugin pod for any reason is down, we
lose Alluxio Fuse process and it's very cumbersome to bring it back.
With a separate Fuse pod, CSI pod won't affect Fuse process.

Solves Alluxio#14917

1. Removed `javaOptions` from csi section in `values.yaml`. Alluxio
properties in helm chart should be organized in one place, not in
`properties` and in `csi`.
2. Add property `mountInPod` in csi section. If set to `true`, Fuse
process is launched in the separate pod.

pr-link: Alluxio#15221
change-id: cid-b6897172e11f80618decbfdc0758423e71aa387e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants