Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.31] fix: add serial format limit to fix OOM issue when formatting a few disks in parallel in csi-azuredisk-node #2622

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/azuredisk/azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ type DriverCore struct {
removeNotReadyTaint bool
kubeClient kubernetes.Interface
// a timed cache storing volume stats <volumeID, volumeStats>
volStatsCache azcache.Resource
volStatsCache azcache.Resource
maxConcurrentFormat int64
concurrentFormatTimeout int64
}

// Driver is the v1 implementation of the Azure Disk CSI Driver.
Expand Down Expand Up @@ -176,6 +178,8 @@ func newDriverV1(options *DriverOptions) *Driver {
driver.endpoint = options.Endpoint
driver.disableAVSetNodes = options.DisableAVSetNodes
driver.removeNotReadyTaint = options.RemoveNotReadyTaint
driver.maxConcurrentFormat = options.MaxConcurrentFormat
driver.concurrentFormatTimeout = options.ConcurrentFormatTimeout
driver.volumeLocks = volumehelper.NewVolumeLocks()
driver.ioHandler = azureutils.NewOSIOHandler()
driver.hostUtil = hostutil.NewHostUtil()
Expand Down Expand Up @@ -263,7 +267,7 @@ func newDriverV1(options *DriverOptions) *Driver {
}
}

driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.useCSIProxyGAInterface)
driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
if err != nil {
klog.Fatalf("Failed to get safe mounter. Error: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/azuredisk/azuredisk_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type DriverOptions struct {
Endpoint string
DisableAVSetNodes bool
RemoveNotReadyTaint bool
MaxConcurrentFormat int64
ConcurrentFormatTimeout int64
}

func (o *DriverOptions) AddFlags() *flag.FlagSet {
Expand Down Expand Up @@ -103,6 +105,8 @@ func (o *DriverOptions) AddFlags() *flag.FlagSet {
fs.BoolVar(&o.DisableAVSetNodes, "disable-avset-nodes", false, "disable DisableAvailabilitySetNodes in cloud config for controller")
fs.BoolVar(&o.RemoveNotReadyTaint, "remove-not-ready-taint", true, "remove NotReady taint from node when node is ready")
fs.StringVar(&o.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")
fs.Int64Var(&o.MaxConcurrentFormat, "max-concurrent-format", 2, "maximum number of concurrent format exec calls")
fs.Int64Var(&o.ConcurrentFormatTimeout, "concurrent-format-timeout", 120, "maximum time in seconds duration of a format operation before its concurrency token is released")

return fs
}
3 changes: 2 additions & 1 deletion pkg/azuredisk/azuredisk_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"os"
"reflect"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5"
"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -143,7 +144,7 @@ func newDriverV2(options *DriverOptions) *DriverV2 {
}
}

driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.useCSIProxyGAInterface)
driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
if err != nil {
klog.Fatalf("Failed to get safe mounter. Error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/azuredisk/fake_azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func newFakeDriverV1(ctrl *gomock.Controller) (*fakeDriverV1, error) {
driver.diskController = NewManagedDiskController(driver.cloud)
driver.clientFactory = driver.cloud.ComputeClientFactory

mounter, err := mounter.NewSafeMounter(true, driver.useCSIProxyGAInterface)
mounter, err := mounter.NewSafeMounter(true, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/azuredisk/fake_azuredisk_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ limitations under the License.
package azuredisk

import (
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"go.uber.org/mock/gomock"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -74,7 +76,7 @@ func newFakeDriverV2(ctrl *gomock.Controller) (*fakeDriverV2, error) {
driver.diskController = NewManagedDiskController(driver.cloud)
driver.clientFactory = driver.cloud.ComputeClientFactory

mounter, err := mounter.NewSafeMounter(true, driver.useCSIProxyGAInterface)
mounter, err := mounter.NewSafeMounter(true, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/mounter/fake_safe_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"runtime"
"strings"
"time"

"k8s.io/mount-utils"
"k8s.io/utils/exec"
Expand All @@ -35,7 +36,7 @@ type FakeSafeMounter struct {
// NewFakeSafeMounter creates a mount.SafeFormatAndMount instance suitable for use in unit tests.
func NewFakeSafeMounter() (*mount.SafeFormatAndMount, error) {
if runtime.GOOS == "windows" {
return NewSafeMounter(true, true)
return NewSafeMounter(true, true, 2, time.Duration(120)*time.Second)
}

fakeSafeMounter := FakeSafeMounter{}
Expand Down
10 changes: 5 additions & 5 deletions pkg/mounter/safe_mounter_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ limitations under the License.
package mounter

import (
"time"

"k8s.io/mount-utils"
utilexec "k8s.io/utils/exec"
)

func NewSafeMounter(_, _ bool) (*mount.SafeFormatAndMount, error) {
return &mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: utilexec.New(),
}, nil
func NewSafeMounter(_, _ bool, maxConcurrentFormat int, concurrentFormatTimeout time.Duration) (*mount.SafeFormatAndMount, error) {
opt := mount.WithMaxConcurrentFormat(maxConcurrentFormat, concurrentFormatTimeout)
return mount.NewSafeFormatAndMount(mount.New(""), utilexec.New(), opt), nil
}
3 changes: 2 additions & 1 deletion pkg/mounter/safe_mounter_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package mounter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewSafeMounter(t *testing.T) {
resp, err := NewSafeMounter(true, true)
resp, err := NewSafeMounter(true, true, 2, time.Duration(120)*time.Second)
assert.NotNil(t, resp)
assert.Nil(t, err)
}
9 changes: 4 additions & 5 deletions pkg/mounter/safe_mounter_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"strconv"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
disk "github.com/kubernetes-csi/csi-proxy/client/api/disk/v1"
Expand Down Expand Up @@ -411,13 +412,11 @@ func newCSIProxyMounter() (*csiProxyMounter, error) {
}, nil
}

func NewSafeMounter(enableWindowsHostProcess, useCSIProxyGAInterface bool) (*mount.SafeFormatAndMount, error) {
func NewSafeMounter(enableWindowsHostProcess, useCSIProxyGAInterface bool, maxConcurrentFormat int, concurrentFormatTimeout time.Duration) (*mount.SafeFormatAndMount, error) {
if enableWindowsHostProcess {
klog.V(2).Infof("using windows host process mounter")
return &mount.SafeFormatAndMount{
Interface: NewWinMounter(),
Exec: utilexec.New(),
}, nil
opt := mount.WithMaxConcurrentFormat(maxConcurrentFormat, concurrentFormatTimeout)
return mount.NewSafeFormatAndMount(NewWinMounter(), utilexec.New(), opt), nil
} else {
if useCSIProxyGAInterface {
csiProxyMounter, err := newCSIProxyMounter()
Expand Down
Loading