From 7f1e04e15631b14f7d3f441cc97bed1a968f2e47 Mon Sep 17 00:00:00 2001 From: Artem Minyaylov Date: Thu, 9 Mar 2023 10:52:53 -0800 Subject: [PATCH] Set mount concurrency to 1 --- cmd/gce-pd-csi-driver/main.go | 20 +++++++++++--------- pkg/mount-manager/safe-mounter_linux.go | 13 +++++-------- pkg/mount-manager/safe-mounter_windows.go | 4 +++- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 948364384..4d45dbe68 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -59,7 +59,9 @@ var ( waitForOpBackoffSteps = flag.Int("wait-op-backoff-steps", 100, "Steps for wait for operation backoff") waitForOpBackoffCap = flag.Duration("wait-op-backoff-cap", 0, "Cap for wait for operation backoff") - maxprocs = flag.Int("maxprocs", 1, "GOMAXPROCS override") + maxProcs = flag.Int("maxprocs", 1, "GOMAXPROCS override") + maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls") + concurrentFormatTimeout = flag.Duration("concurrent-format-timeout", 1*time.Minute, "The maximum duration of a format operation before its concurrency token is released") version string ) @@ -88,7 +90,7 @@ func main() { func handle() { var err error - runtime.GOMAXPROCS(*maxprocs) + runtime.GOMAXPROCS(*maxProcs) klog.Infof("Sys info: NumCPU: %v MAXPROC: %v", runtime.NumCPU(), runtime.GOMAXPROCS(0)) if version == "" { @@ -110,16 +112,16 @@ func handle() { klog.Fatalf("Bad extra volume labels: %v", err.Error()) } - gceDriver := driver.GetGCEDriver() - - //Initialize GCE Driver ctx, cancel := context.WithCancel(context.Background()) defer cancel() - //Initialize identity server + // Initialize driver + gceDriver := driver.GetGCEDriver() + + // Initialize identity server identityServer := driver.NewIdentityServer(gceDriver) - //Initialize requirements for the controller service + // Initialize requirements for the controller service var controllerServer *driver.GCEControllerServer if *runControllerService { cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, *computeEndpoint) @@ -133,10 +135,10 @@ func handle() { klog.Warningf("controller service is disabled but cloud config given - it has no effect") } - //Initialize requirements for the node service + // Initialize requirements for the node service var nodeServer *driver.GCENodeServer if *runNodeService { - mounter, err := mountmanager.NewSafeMounter() + mounter, err := mountmanager.NewSafeMounter(*maxConcurrentFormat, *concurrentFormatTimeout) if err != nil { klog.Fatalf("Failed to get safe mounter: %v", err.Error()) } diff --git a/pkg/mount-manager/safe-mounter_linux.go b/pkg/mount-manager/safe-mounter_linux.go index 40f6c7b3e..22d2d8b3f 100644 --- a/pkg/mount-manager/safe-mounter_linux.go +++ b/pkg/mount-manager/safe-mounter_linux.go @@ -17,16 +17,13 @@ limitations under the License. package mountmanager import ( + "time" + "k8s.io/mount-utils" "k8s.io/utils/exec" ) -func NewSafeMounter() (*mount.SafeFormatAndMount, error) { - realMounter := mount.New("") - realExec := exec.New() - return &mount.SafeFormatAndMount{ - Interface: realMounter, - Exec: realExec, - }, nil - +func NewSafeMounter(maxConcurrentFormat int, concurrentFormatTimeout time.Duration) (*mount.SafeFormatAndMount, error) { + opt := mount.WithMaxConcurrentFormat(maxConcurrentFormat, concurrentFormatTimeout) + return mount.NewSafeFormatAndMount(mount.New(""), exec.New(), opt), nil } diff --git a/pkg/mount-manager/safe-mounter_windows.go b/pkg/mount-manager/safe-mounter_windows.go index 23e6dd36a..b37a4009b 100644 --- a/pkg/mount-manager/safe-mounter_windows.go +++ b/pkg/mount-manager/safe-mounter_windows.go @@ -17,6 +17,8 @@ limitations under the License. package mountmanager import ( + "time" + "k8s.io/klog/v2" "k8s.io/mount-utils" utilexec "k8s.io/utils/exec" @@ -58,7 +60,7 @@ type CSIProxyMounter interface { GetDiskTotalBytes(devicePath string) (int64, error) } -func NewSafeMounter() (*mount.SafeFormatAndMount, error) { +func NewSafeMounter(int, time.Duration) (*mount.SafeFormatAndMount, error) { csiProxyMounterV1, err := NewCSIProxyMounterV1() if err == nil { klog.V(4).Infof("using CSIProxyMounterV1, %s", csiProxyMounterV1.GetAPIVersions())