diff --git a/pkg/driver/fakes.go b/pkg/driver/fakes.go index 0883c8b313..a701002748 100644 --- a/pkg/driver/fakes.go +++ b/pkg/driver/fakes.go @@ -23,6 +23,7 @@ import ( "github.com/juicedata/juicefs-csi-driver/pkg/juicefs" "github.com/juicedata/juicefs-csi-driver/pkg/k8sclient" "github.com/juicedata/juicefs-csi-driver/pkg/util" + "github.com/juicedata/juicefs-csi-driver/pkg/util/dispatch" ) // NewFakeDriver creates a new mock driver used for testing @@ -36,6 +37,7 @@ func NewFakeDriver(endpoint string, fakeProvider juicefs.Interface) *Driver { vols: make(map[string]int64), }, nodeService: nodeService{ + quotaPool: dispatch.NewPool(defaultQuotaPoolNum), juicefs: fakeProvider, nodeID: "fake-node-id", k8sClient: &k8sclient.K8sClient{Interface: fake.NewSimpleClientset()}, diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 8c1284ff35..d266c19759 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -38,15 +38,20 @@ import ( "github.com/juicedata/juicefs-csi-driver/pkg/juicefs" "github.com/juicedata/juicefs-csi-driver/pkg/k8sclient" "github.com/juicedata/juicefs-csi-driver/pkg/util" + "github.com/juicedata/juicefs-csi-driver/pkg/util/dispatch" ) var ( nodeCaps = []csi.NodeServiceCapability_RPC_Type{csi.NodeServiceCapability_RPC_GET_VOLUME_STATS} ) -const defaultCheckTimeout = 2 * time.Second +const ( + defaultCheckTimeout = 2 * time.Second + defaultQuotaPoolNum = 4 +) type nodeService struct { + quotaPool *dispatch.Pool csi.UnimplementedNodeServer mount.SafeFormatAndMount juicefs juicefs.Interface @@ -83,6 +88,7 @@ func newNodeService(nodeID string, k8sClient *k8sclient.K8sClient, reg prometheu metrics := newNodeMetrics(reg) jfsProvider := juicefs.NewJfsProvider(mounter, k8sClient) return &nodeService{ + quotaPool: dispatch.NewPool(defaultQuotaPoolNum), SafeFormatAndMount: *mounter, juicefs: jfsProvider, nodeID: nodeID, @@ -194,14 +200,14 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis } } - go func() { + d.quotaPool.Run(context.Background(), func(ctx context.Context) { err := retry.OnError(retry.DefaultRetry, func(err error) bool { return true }, func() error { - return d.juicefs.SetQuota(context.Background(), secrets, settings, path.Join(subdir, quotaPath), capacity) + return d.juicefs.SetQuota(ctx, secrets, settings, path.Join(subdir, quotaPath), capacity) }) if err != nil { log.Error(err, "set quota failed") } - }() + }) } log.Info("juicefs volume mounted", "volumeId", volumeID, "target", target) diff --git a/pkg/util/dispatch/pool.go b/pkg/util/dispatch/pool.go new file mode 100644 index 0000000000..69459dcaba --- /dev/null +++ b/pkg/util/dispatch/pool.go @@ -0,0 +1,71 @@ +/* + Copyright 2025 Juicedata Inc + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package dispatch + +import ( + "context" + "fmt" +) + +type Pool struct { + Num int + PoolCh chan struct{} +} + +func NewPool(num int) *Pool { + if num < 1 { + num = 1 + } + return &Pool{ + Num: num, + PoolCh: make(chan struct{}, num), + } +} + +func (p *Pool) RunAndWait(ctx context.Context, worker func(ctx context.Context) error) error { + p.PoolCh <- struct{}{} + + errCh := make(chan error, 1) + defer close(errCh) + go func() { + defer func() { + <-p.PoolCh + }() + + errCh <- worker(ctx) + }() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context timeout") + case err := <-errCh: + return err + } + } +} + +func (p *Pool) Run(ctx context.Context, worker func(ctx context.Context)) { + go func() { + p.PoolCh <- struct{}{} + defer func() { + <-p.PoolCh + }() + + worker(ctx) + }() +}