Skip to content

Commit

Permalink
Merge pull request #6336 from qianlei90/fix-kwok-provider
Browse files Browse the repository at this point in the history
fix(kwok): prevent quitting when scaling down node group
  • Loading branch information
k8s-ci-robot authored Jan 18, 2024
2 parents d31e1cf + e71a123 commit df0ce2d
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 41 deletions.
6 changes: 4 additions & 2 deletions cluster-autoscaler/cloudprovider/builder/builder_kwok.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/kwok"
"k8s.io/autoscaler/cluster-autoscaler/config"

"k8s.io/client-go/informers"
)

// AvailableCloudProviders supported by the cloud provider builder.
Expand All @@ -33,10 +35,10 @@ var AvailableCloudProviders = []string{
// DefaultCloudProvider for Kwok-only build is Kwok.
const DefaultCloudProvider = cloudprovider.KwokProviderName

func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter, informerFactory informers.SharedInformerFactory) cloudprovider.CloudProvider {
switch opts.CloudProviderName {
case cloudprovider.KwokProviderName:
return kwok.BuildKwokCloudProvider(opts, do, rl)(opts, do, rl)
return kwok.BuildKwok(opts, do, rl, informerFactory)
}

return nil
Expand Down
26 changes: 13 additions & 13 deletions cluster-autoscaler/cloudprovider/kwok/kwok_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ import (
"log"
"strconv"
"strings"
"time"

kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/yaml"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/kubernetes"
clientscheme "k8s.io/client-go/kubernetes/scheme"
v1lister "k8s.io/client-go/listers/core/v1"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

const (
Expand Down Expand Up @@ -155,13 +155,19 @@ func createNodegroups(nodes []*apiv1.Node, kubeClient kubernetes.Interface, kc *
}

ngName := getNGName(nodes[i], kc)
if ngName == "" {
klog.Fatalf("%s '%s' for node '%s' not present in the manifest",
kc.status.groupNodesBy, kc.status.key,
nodes[i].GetName())
}

if ngs[ngName] != nil {
ngs[ngName].targetSize += 1
continue
}

ng := parseAnnotations(nodes[i], kc)
ng.name = getNGName(nodes[i], kc)
ng.name = ngName
sanitizeNode(nodes[i])
prepareNode(nodes[i], ng.name)
ng.nodeTemplate = nodes[i]
Expand Down Expand Up @@ -250,6 +256,8 @@ func parseAnnotations(no *apiv1.Node, kc *KwokProviderConfig) *NodeGroup {
}
}

// getNGName returns the node group name of the given k8s node object.
// Return empty string if no node group is found.
func getNGName(no *apiv1.Node, kc *KwokProviderConfig) string {

if no.GetAnnotations()[NGNameAnnotation] != "" {
Expand All @@ -263,16 +271,8 @@ func getNGName(no *apiv1.Node, kc *KwokProviderConfig) string {
case "label":
ngName = no.GetLabels()[kc.status.key]
default:
klog.Fatal("grouping criteria for nodes is not set (expected: 'annotation' or 'label')")
}

if ngName == "" {
klog.Fatalf("%s '%s' for node '%s' not present in the manifest",
kc.status.groupNodesBy, kc.status.key,
no.GetName())
klog.Warning("grouping criteria for nodes is not set (expected: 'annotation' or 'label')")
}

ngName = fmt.Sprintf("%s-%v", ngName, time.Now().Unix())

return ngName
}
4 changes: 2 additions & 2 deletions cluster-autoscaler/cloudprovider/kwok/kwok_nodegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ func (nodeGroup *NodeGroup) IncreaseSize(delta int) error {
if err != nil {
return fmt.Errorf("couldn't create new node '%s': %v", node.Name, err)
}
nodeGroup.targetSize += 1
}

nodeGroup.targetSize = newSize

return nil
}

Expand All @@ -111,6 +110,7 @@ func (nodeGroup *NodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
if err != nil {
return err
}
nodeGroup.targetSize -= 1
}
return nil
}
Expand Down
43 changes: 25 additions & 18 deletions cluster-autoscaler/cloudprovider/kwok/kwok_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ import (
"os"
"strings"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
kubeclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

// Name returns name of the cloud provider.
Expand Down Expand Up @@ -123,24 +125,28 @@ func (kwok *KwokCloudProvider) GetNodeGpuConfig(node *apiv1.Node) *cloudprovider

// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
// TODO(vadasambar): implement this
func (kwok *KwokCloudProvider) Refresh() error {

// TODO(vadasambar): causes CA to not recognize kwok nodegroups
// needs better implementation
// nodeList, err := kwok.lister.List()
// if err != nil {
// return err
// }
allNodes, err := kwok.allNodesLister.List(labels.Everything())
if err != nil {
klog.ErrorS(err, "failed to list all nodes from lister")
return err
}

targetSizeInCluster := make(map[string]int)

// ngs := []*NodeGroup{}
// for _, no := range nodeList {
// ng := parseAnnotationsToNodegroup(no)
// ng.kubeClient = kwok.kubeClient
// ngs = append(ngs, ng)
// }
for _, node := range allNodes {
ngName := getNGName(node, kwok.config)
if ngName == "" {
continue
}

// kwok.nodeGroups = ngs
targetSizeInCluster[ngName] += 1
}

for _, ng := range kwok.nodeGroups {
ng.targetSize = targetSizeInCluster[ng.Id()]
}

return nil
}
Expand Down Expand Up @@ -245,6 +251,7 @@ func BuildKwokProvider(ko *kwokOptions) (*KwokCloudProvider, error) {
kubeClient: ko.kubeClient,
resourceLimiter: ko.resourceLimiter,
config: kwokConfig,
allNodesLister: ko.allNodesLister,
}, nil
}

Expand Down
147 changes: 143 additions & 4 deletions cluster-autoscaler/cloudprovider/kwok/kwok_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import (
"os"
"testing"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/kubernetes/fake"
v1lister "k8s.io/client-go/listers/core/v1"
core "k8s.io/client-go/testing"
Expand Down Expand Up @@ -156,6 +157,110 @@ func TestNodeGroups(t *testing.T) {
})
}

func TestRefresh(t *testing.T) {
fakeClient := &fake.Clientset{}
var nodesFrom string
fakeClient.Fake.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.GetAction)

if getAction == nil {
return false, nil, nil
}

if getAction.GetName() == defaultConfigName {
if nodesFrom == "configmap" {
return true, &apiv1.ConfigMap{
Data: map[string]string{
configKey: testConfig,
},
}, nil
}

return true, &apiv1.ConfigMap{
Data: map[string]string{
configKey: testConfigDynamicTemplates,
},
}, nil

}

if getAction.GetName() == defaultTemplatesConfigName {
if nodesFrom == "configmap" {
return true, &apiv1.ConfigMap{
Data: map[string]string{
templatesKey: testTemplates,
},
}, nil
}
}

return true, nil, errors.NewNotFound(apiv1.Resource("configmaps"), "whatever")
})

os.Setenv("POD_NAMESPACE", "kube-system")

t.Run("refresh nodegroup target size", func(t *testing.T) {
nodesFrom = "configmap"
ngName := "kind-worker"
fakeNodeLister := newTestAllNodeLister(map[string]*apiv1.Node{
"node1": {
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Labels: map[string]string{
"kwok-nodegroup": ngName,
},
},
},
"node2": {
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
Labels: map[string]string{
"kwok-nodegroup": ngName,
},
},
},
"node3": {
ObjectMeta: metav1.ObjectMeta{
Name: "node3",
Labels: map[string]string{
"kwok-nodegroup": ngName,
},
},
},
"node4": {
ObjectMeta: metav1.ObjectMeta{
Name: "node4",
},
},
})

ko := &kwokOptions{
kubeClient: fakeClient,
autoscalingOpts: &config.AutoscalingOptions{},
discoveryOpts: &cloudprovider.NodeGroupDiscoveryOptions{},
resourceLimiter: cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000},
map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}),
allNodesLister: fakeNodeLister,
ngNodeListerFn: testNodeLister,
}

p, err := BuildKwokProvider(ko)
assert.NoError(t, err)
assert.NotNil(t, p)

err = p.Refresh()
assert.Nil(t, err)
for _, ng := range p.NodeGroups() {
if ng.Id() == ngName {
targetSize, err := ng.TargetSize()
assert.Nil(t, err)
assert.Equal(t, 3, targetSize)
}
}
})
}

func TestGetResourceLimiter(t *testing.T) {
fakeClient := &fake.Clientset{}
fakeClient.Fake.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
Expand Down Expand Up @@ -639,6 +744,40 @@ func TestNodeGroupForNode(t *testing.T) {
assert.Contains(t, ng.Id(), "kind-worker")
})

t.Run("empty nodegroup name for node", func(t *testing.T) {
nodesFrom = "configmap"
fakeNodeLister := newTestAllNodeLister(map[string]*apiv1.Node{})

ko := &kwokOptions{
kubeClient: fakeClient,
autoscalingOpts: &config.AutoscalingOptions{},
discoveryOpts: &cloudprovider.NodeGroupDiscoveryOptions{},
resourceLimiter: cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000},
map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}),
allNodesLister: fakeNodeLister,
ngNodeListerFn: testNodeLister,
}

p, err := BuildKwokProvider(ko)
assert.NoError(t, err)
assert.NotNil(t, p)
assert.Len(t, p.nodeGroups, 1)
assert.Contains(t, p.nodeGroups[0].Id(), "kind-worker")

testNode := &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-without-labels",
},
Spec: apiv1.NodeSpec{
ProviderID: "kwok:random-instance-id",
},
}
ng, err := p.NodeGroupForNode(testNode)
assert.NoError(t, err)
assert.Nil(t, ng)
})

}

func TestBuildKwokProvider(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions cluster-autoscaler/cloudprovider/kwok/kwok_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package kwok

import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
)

// KwokCloudProvider implements CloudProvider interface for kwok
Expand All @@ -32,6 +33,8 @@ type KwokCloudProvider struct {
resourceLimiter *cloudprovider.ResourceLimiter
// kubeClient is to be used only for create, delete and update
kubeClient kubernetes.Interface
//allNodesLister is a lister to list all nodes in cluster
allNodesLister listersv1.NodeLister
}

type kwokOptions struct {
Expand Down

0 comments on commit df0ce2d

Please sign in to comment.