Skip to content

Commit

Permalink
add concurrency control when pull image in daemon
Browse files Browse the repository at this point in the history
Signed-off-by: Abner-1 <[email protected]>
  • Loading branch information
ABNER-1 committed Feb 19, 2025
1 parent 6d2f3f5 commit ec217dd
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 41 deletions.
13 changes: 7 additions & 6 deletions cmd/daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ import (
)

var (
bindAddr = flag.String("addr", ":10221", "The address the metric endpoint and healthz binds to.")
pprofAddr = flag.String("pprof-addr", ":10222", "The address the pprof binds to.")
enablePprof = flag.Bool("enable-pprof", true, "Enable pprof for daemon.")
pluginConfigFile = flag.String("plugin-config-file", "/kruise/CredentialProviderPlugin.yaml", "The path of plugin config file.")
pluginBinDir = flag.String("plugin-bin-dir", "/kruise/plugins", "The path of directory of plugin binaries.")
bindAddr = flag.String("addr", ":10221", "The address the metric endpoint and healthz binds to.")
pprofAddr = flag.String("pprof-addr", ":10222", "The address the pprof binds to.")
enablePprof = flag.Bool("enable-pprof", true, "Enable pprof for daemon.")
pluginConfigFile = flag.String("plugin-config-file", "/kruise/CredentialProviderPlugin.yaml", "The path of plugin config file.")
pluginBinDir = flag.String("plugin-bin-dir", "/kruise/plugins", "The path of directory of plugin binaries.")
maxWorkersForPullImage = flag.Int("max-workers-for-pull-image", -1, "The maximum number of workers for pulling images.")
)

func main() {
Expand All @@ -71,7 +72,7 @@ func main() {
}()
}
ctx := signals.SetupSignalHandler()
d, err := daemon.NewDaemon(cfg, *bindAddr)
d, err := daemon.NewDaemon(cfg, *bindAddr, *maxWorkersForPullImage)
if err != nil {
klog.Fatalf("Failed to new daemon: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ spec:
- --logtostderr=true
- -v=5
- --feature-gates=AllAlpha=true,AllBeta=true
- --max-workers-for-pull-image=2
image: controller:latest
imagePullPolicy: Always
securityContext:
Expand Down
27 changes: 15 additions & 12 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,6 @@ import (

utilruntime "k8s.io/apimachinery/pkg/util/runtime"

kruiseapis "github.com/openkruise/kruise/apis"
"github.com/openkruise/kruise/pkg/client"
"github.com/openkruise/kruise/pkg/daemon/containermeta"
"github.com/openkruise/kruise/pkg/daemon/containerrecreate"
daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime"
"github.com/openkruise/kruise/pkg/daemon/imagepuller"
daemonoptions "github.com/openkruise/kruise/pkg/daemon/options"
"github.com/openkruise/kruise/pkg/daemon/podprobe"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -48,6 +37,18 @@ import (
"k8s.io/klog/v2"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/metrics"

kruiseapis "github.com/openkruise/kruise/apis"
"github.com/openkruise/kruise/pkg/client"
"github.com/openkruise/kruise/pkg/daemon/containermeta"
"github.com/openkruise/kruise/pkg/daemon/containerrecreate"
daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime"
"github.com/openkruise/kruise/pkg/daemon/imagepuller"
daemonoptions "github.com/openkruise/kruise/pkg/daemon/options"
"github.com/openkruise/kruise/pkg/daemon/podprobe"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
)

const (
Expand Down Expand Up @@ -89,7 +90,7 @@ type daemon struct {
}

// NewDaemon create a daemon
func NewDaemon(cfg *rest.Config, bindAddress string) (Daemon, error) {
func NewDaemon(cfg *rest.Config, bindAddress string, MaxWorkersForPullImages int) (Daemon, error) {

Check warning on line 93 in pkg/daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

pkg/daemon/daemon.go#L93

Added line #L93 was not covered by tests
if cfg == nil {
return nil, fmt.Errorf("cfg can not be nil")
}
Expand Down Expand Up @@ -137,6 +138,8 @@ func NewDaemon(cfg *rest.Config, bindAddress string) (Daemon, error) {
PodInformer: podInformer,
RuntimeFactory: runtimeFactory,
Healthz: healthz,

MaxWorkersForPullImages: MaxWorkersForPullImages,

Check warning on line 142 in pkg/daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

pkg/daemon/daemon.go#L141-L142

Added lines #L141 - L142 were not covered by tests
}

puller, err := imagepuller.NewController(opts, secretManager, cfg)
Expand Down
29 changes: 15 additions & 14 deletions pkg/daemon/imagepuller/imagepuller_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ import (
"reflect"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/client"
kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned"
listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1"
daemonoptions "github.com/openkruise/kruise/pkg/daemon/options"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
utilimagejob "github.com/openkruise/kruise/pkg/util/imagejob"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -45,6 +38,14 @@ import (
"k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/client"
kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned"
listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1"
daemonoptions "github.com/openkruise/kruise/pkg/daemon/options"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
utilimagejob "github.com/openkruise/kruise/pkg/util/imagejob"
)

type Controller struct {
Expand All @@ -65,12 +66,12 @@ func NewController(opts daemonoptions.Options, secretManager daemonutil.SecretMa
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: genericClient.KubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(opts.Scheme, v1.EventSource{Component: "kruise-daemon-imagepuller", Host: opts.NodeName})

queue := workqueue.NewNamedRateLimitingQueue(
// Backoff duration from 500ms to 50~55s
// For nodeimage controller will mark a image:tag task failed (not responded for a long time) if daemon does not report status in 60s.
workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(rand.Intn(5000))),
"imagepuller",
)
// Backoff duration from 500ms to 50~55s
// For nodeimage controller will mark an image:tag task failed (not responded for a long time) if daemon does not report status in 60s.
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(rand.Intn(5000)))
queue := workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{
Name: "imagepuller",
})

Check warning on line 74 in pkg/daemon/imagepuller/imagepuller_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/daemon/imagepuller/imagepuller_controller.go#L69-L74

Added lines #L69 - L74 were not covered by tests

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -94,7 +95,7 @@ func NewController(opts daemonoptions.Options, secretManager daemonutil.SecretMa
},
})

puller, err := newRealPuller(opts.RuntimeFactory.GetImageService(), secretManager, recorder)
puller, err := newRealPuller(opts.RuntimeFactory.GetImageService(), secretManager, recorder, opts.MaxWorkersForPullImages)

Check warning on line 98 in pkg/daemon/imagepuller/imagepuller_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/daemon/imagepuller/imagepuller_controller.go#L98

Added line #L98 was not covered by tests
if err != nil {
return nil, fmt.Errorf("failed to new puller: %v", err)
}
Expand Down
38 changes: 32 additions & 6 deletions pkg/daemon/imagepuller/imagepuller_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ import (
"sync"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
runtimeimage "github.com/openkruise/kruise/pkg/daemon/criruntime/imageruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/util"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
runtimeimage "github.com/openkruise/kruise/pkg/daemon/criruntime/imageruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/util"
)

const (
Expand All @@ -47,6 +49,8 @@ const (
PullImageFailed = "PullImageFailed"
)

var workerLimitedPool errgroup.Group

type puller interface {
Sync(obj *appsv1alpha1.NodeImage, ref *v1.ObjectReference) error
GetStatus(imageName string) *appsv1alpha1.ImageStatus
Expand All @@ -63,13 +67,17 @@ type realPuller struct {

var _ puller = &realPuller{}

func newRealPuller(runtime runtimeimage.ImageService, secretManager daemonutil.SecretManager, eventRecorder record.EventRecorder) (*realPuller, error) {
func newRealPuller(runtime runtimeimage.ImageService, secretManager daemonutil.SecretManager, eventRecorder record.EventRecorder, MaxWorkersForPullImages int) (*realPuller, error) {

Check warning on line 70 in pkg/daemon/imagepuller/imagepuller_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/daemon/imagepuller/imagepuller_worker.go#L70

Added line #L70 was not covered by tests
p := &realPuller{
runtime: runtime,
secretManager: secretManager,
eventRecorder: eventRecorder,
workerPools: make(map[string]workerPool),
}
if MaxWorkersForPullImages > 0 {
klog.InfoS("set image pull worker number", "worker", MaxWorkersForPullImages)
workerLimitedPool.SetLimit(MaxWorkersForPullImages)
}

Check warning on line 80 in pkg/daemon/imagepuller/imagepuller_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/daemon/imagepuller/imagepuller_worker.go#L77-L80

Added lines #L77 - L80 were not covered by tests
return p, nil
}

Expand Down Expand Up @@ -264,6 +272,8 @@ func (w *realWorkerPool) UpdateStatus(status *appsv1alpha1.ImageTagStatus) {
}

func newPullWorker(name string, tagSpec appsv1alpha1.ImageTagSpec, sandboxConfig *appsv1alpha1.SandboxConfig, secrets []v1.Secret, runtime runtimeimage.ImageService, statusUpdater imageStatusUpdater, ref *v1.ObjectReference, eventRecorder record.EventRecorder) *pullWorker {
image := name + ":" + tagSpec.Tag
klog.V(5).InfoS("new pull worker", "image", image)

Check warning on line 276 in pkg/daemon/imagepuller/imagepuller_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/daemon/imagepuller/imagepuller_worker.go#L275-L276

Added lines #L275 - L276 were not covered by tests
o := &pullWorker{
name: name,
tagSpec: tagSpec,
Expand All @@ -276,7 +286,23 @@ func newPullWorker(name string, tagSpec appsv1alpha1.ImageTagSpec, sandboxConfig
active: true,
stopCh: make(chan struct{}),
}
go o.Run()

go func() {
newStatus := &appsv1alpha1.ImageTagStatus{
Tag: tagSpec.Tag,
Phase: appsv1alpha1.ImagePhaseWaiting,
Version: tagSpec.Version,
}
o.statusUpdater.UpdateStatus(newStatus)

klog.V(5).InfoS("pull worker waiting", "image", image)
workerLimitedPool.Go(func() error {
klog.V(5).InfoS("pull worker start", "image", image)
o.Run()
klog.V(5).InfoS("pull worker end", "image", image)
return nil
})

Check warning on line 304 in pkg/daemon/imagepuller/imagepuller_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/daemon/imagepuller/imagepuller_worker.go#L289-L304

Added lines #L289 - L304 were not covered by tests
}()
return o
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/daemon/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package options

import (
daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
)

type Options struct {
Expand All @@ -32,4 +33,6 @@ type Options struct {

RuntimeFactory daemonruntime.Factory
Healthz *daemonutil.Healthz

MaxWorkersForPullImages int
}
2 changes: 1 addition & 1 deletion pkg/daemon/podprobe/pod_probe_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (c *Controller) syncUpdateNodePodProbeStatus() error {
klog.ErrorS(err, "NodePodProbe update status failed", "nodeName", c.nodeName)
return err
}
klog.InfoS("NodePodProbe(%s) update status success", "nodeName", c.nodeName, "from", commonutil.DumpJSON(npp.Status), "to", commonutil.DumpJSON(nppClone.Status))
klog.InfoS("NodePodProbe update status success", "nodeName", c.nodeName, "from", commonutil.DumpJSON(npp.Status), "to", commonutil.DumpJSON(nppClone.Status))
return nil
}

Expand Down

0 comments on commit ec217dd

Please sign in to comment.