Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CA-1.18] #2934 cherry-pick: Fixes 2932: let the capi version to be discovered #3105

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package clusterapi
import (
"context"
"fmt"
"os"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
Expand All @@ -38,7 +40,9 @@ import (
const (
machineProviderIDIndex = "machineProviderIDIndex"
nodeProviderIDIndex = "nodeProviderIDIndex"
defaultMachineAPI = "v1alpha2.cluster.x-k8s.io"
defaultCAPIGroup = "cluster.x-k8s.io"
// CAPIGroupEnvVar contains the environment variable name which allows overriding defaultCAPIGroup.
CAPIGroupEnvVar = "CAPI_GROUP"
)

// machineController watches for Nodes, Machines, MachineSets and
Expand Down Expand Up @@ -271,37 +275,58 @@ func (c *machineController) machinesInMachineSet(machineSet *MachineSet) ([]*Mac
return result, nil
}

// getCAPIGroup returns a string that specifies the group for the API.
// It will return either the value from the
// CAPI_GROUP environment variable, or the default value i.e cluster.x-k8s.io.
func getCAPIGroup() string {
g := os.Getenv(CAPIGroupEnvVar)
if g == "" {
g = defaultCAPIGroup
}
klog.V(4).Infof("Using API Group %q", g)
return g
}

// newMachineController constructs a controller that watches Nodes,
// Machines and MachineSet as they are added, updated and deleted on
// the cluster.
func newMachineController(
dynamicclient dynamic.Interface,
kubeclient kubeclient.Interface,
discoveryclient discovery.DiscoveryInterface,
) (*machineController, error) {
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeclient, 0)
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicclient, 0, metav1.NamespaceAll, nil)

// TODO(alberto): let environment variable to override defaultMachineAPI
machineDeploymentResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinedeployments.%v", defaultMachineAPI))
CAPIGroup := getCAPIGroup()
CAPIVersion, err := getAPIGroupPreferredVersion(discoveryclient, CAPIGroup)
if err != nil {
panic("CAPIVersion")
}
klog.Infof("Using version %q for API group %q", CAPIVersion, CAPIGroup)

machineDeploymentResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinedeployments.%v.%v", CAPIVersion, CAPIGroup))
if machineDeploymentResource == nil {
panic("MachineDeployment")
}

machineSetResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinesets.%v", defaultMachineAPI))
machineSetResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinesets.%v.%v", CAPIVersion, CAPIGroup))
if machineSetResource == nil {
panic("MachineSetResource")
}

machineResource, _ := schema.ParseResourceArg(fmt.Sprintf("machines.%v", defaultMachineAPI))
machineResource, _ := schema.ParseResourceArg(fmt.Sprintf("machines.%v.%v", CAPIVersion, CAPIGroup))
if machineResource == nil {
panic("machineResource")
}

machineInformer := informerFactory.ForResource(*machineResource)
machineSetInformer := informerFactory.ForResource(*machineSetResource)
var machineDeploymentInformer informers.GenericInformer

machineDeploymentInformer = informerFactory.ForResource(*machineDeploymentResource)
machineDeploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
machineDeploymentInformer := informerFactory.ForResource(*machineDeploymentResource)

machineInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
machineSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
machineDeploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})

nodeInformer := kubeInformerFactory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{})
Expand Down Expand Up @@ -332,6 +357,21 @@ func newMachineController(
}, nil
}

func getAPIGroupPreferredVersion(client discovery.DiscoveryInterface, APIGroup string) (string, error) {
groupList, err := client.ServerGroups()
if err != nil {
return "", fmt.Errorf("failed to get ServerGroups: %v", err)
}

for _, group := range groupList.Groups {
if group.Name == APIGroup {
return group.PreferredVersion.Version, nil
}
}

return "", fmt.Errorf("failed to find API group %q", APIGroup)
}

func (c *machineController) machineSetProviderIDs(machineSet *MachineSet) ([]string, error) {
machines, err := c.machinesInMachineSet(machineSet)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package clusterapi

import (
"fmt"
"os"
"path"
"reflect"
"sort"
Expand All @@ -29,8 +30,10 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
fakediscovery "k8s.io/client-go/discovery/fake"
fakedynamic "k8s.io/client-go/dynamic/fake"
fakekube "k8s.io/client-go/kubernetes/fake"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
)

Expand All @@ -53,6 +56,8 @@ type testSpec struct {
rootIsMachineDeployment bool
}

const customCAPIGroup = "custom.x-k8s.io"

func mustCreateTestController(t *testing.T, testConfigs ...*testConfig) (*machineController, testControllerShutdownFunc) {
t.Helper()

Expand All @@ -76,7 +81,19 @@ func mustCreateTestController(t *testing.T, testConfigs ...*testConfig) (*machin

kubeclientSet := fakekube.NewSimpleClientset(nodeObjects...)
dynamicClientset := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), machineObjects...)
controller, err := newMachineController(dynamicClientset, kubeclientSet)
discoveryClient := &fakediscovery.FakeDiscovery{
Fake: &clientgotesting.Fake{
Resources: []*v1.APIResourceList{
{
GroupVersion: fmt.Sprintf("%s/v1beta1", customCAPIGroup),
},
{
GroupVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
},
},
},
}
controller, err := newMachineController(dynamicClientset, kubeclientSet, discoveryClient)
if err != nil {
t.Fatal("failed to create test controller")
}
Expand Down Expand Up @@ -136,7 +153,7 @@ func createTestConfigs(specs ...testSpec) []*testConfig {

config.machineSet = &MachineSet{
TypeMeta: v1.TypeMeta{
APIVersion: "cluster.x-k8s.io/v1alpha2",
APIVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
Kind: "MachineSet",
},
ObjectMeta: v1.ObjectMeta{
Expand All @@ -152,7 +169,7 @@ func createTestConfigs(specs ...testSpec) []*testConfig {
} else {
config.machineDeployment = &MachineDeployment{
TypeMeta: v1.TypeMeta{
APIVersion: "cluster.x-k8s.io/v1alpha2",
APIVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
Kind: "MachineDeployment",
},
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -211,7 +228,7 @@ func makeLinkedNodeAndMachine(i int, namespace string, owner v1.OwnerReference)

machine := &Machine{
TypeMeta: v1.TypeMeta{
APIVersion: "cluster.x-k8s.io/v1alpha2",
APIVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
Kind: "Machine",
},
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -991,3 +1008,121 @@ func TestControllerMachineSetNodeNamesUsingStatusNodeRefName(t *testing.T) {
}
}
}

func TestControllerGetAPIVersionGroup(t *testing.T) {
expected := "mygroup"
if err := os.Setenv(CAPIGroupEnvVar, expected); err != nil {
t.Fatalf("unexpected error: %v", err)
}
observed := getCAPIGroup()
if observed != expected {
t.Fatalf("Wrong Version Group detected, expected %q, got %q", expected, observed)
}

expected = defaultCAPIGroup
if err := os.Setenv(CAPIGroupEnvVar, ""); err != nil {
t.Fatalf("unexpected error: %v", err)
}
observed = getCAPIGroup()
if observed != expected {
t.Fatalf("Wrong Version Group detected, expected %q, got %q", expected, observed)
}
}

func TestControllerGetAPIVersionGroupWithMachineDeployments(t *testing.T) {
testConfig := createMachineDeploymentTestConfig(testNamespace, 1, map[string]string{
nodeGroupMinSizeAnnotationKey: "1",
nodeGroupMaxSizeAnnotationKey: "1",
})
if err := os.Setenv(CAPIGroupEnvVar, customCAPIGroup); err != nil {
t.Fatalf("unexpected error: %v", err)
}

testConfig.machineDeployment.TypeMeta.APIVersion = fmt.Sprintf("%s/v1beta1", customCAPIGroup)
testConfig.machineSet.TypeMeta.APIVersion = fmt.Sprintf("%s/v1beta1", customCAPIGroup)
for _, machine := range testConfig.machines {
machine.TypeMeta.APIVersion = fmt.Sprintf("%s/v1beta1", customCAPIGroup)
}
controller, stop := mustCreateTestController(t, testConfig)
defer stop()

machineDeployments, err := controller.listMachineDeployments(testNamespace, labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if l := len(machineDeployments); l != 1 {
t.Fatalf("Incorrect number of MachineDeployments, expected 1, got %d", l)
}

machineSets, err := controller.listMachineSets(testNamespace, labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if l := len(machineSets); l != 1 {
t.Fatalf("Incorrect number of MachineSets, expected 1, got %d", l)
}

machines, err := controller.listMachines(testNamespace, labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if l := len(machines); l != 1 {
t.Fatalf("Incorrect number of Machines, expected 1, got %d", l)
}

if err := os.Unsetenv(CAPIGroupEnvVar); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

func TestGetAPIGroupPreferredVersion(t *testing.T) {
testCases := []struct {
description string
APIGroup string
preferredVersion string
error bool
}{
{
description: "find version for default API group",
APIGroup: defaultCAPIGroup,
preferredVersion: "v1alpha3",
error: false,
},
{
description: "find version for another API group",
APIGroup: customCAPIGroup,
preferredVersion: "v1beta1",
error: false,
},
{
description: "API group does not exist",
APIGroup: "does.not.exist",
preferredVersion: "",
error: true,
},
}

discoveryClient := &fakediscovery.FakeDiscovery{
Fake: &clientgotesting.Fake{
Resources: []*v1.APIResourceList{
{
GroupVersion: fmt.Sprintf("%s/v1beta1", customCAPIGroup),
},
{
GroupVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
},
},
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
version, err := getAPIGroupPreferredVersion(discoveryClient, tc.APIGroup)
if (err != nil) != tc.error {
t.Errorf("expected to have error: %t. Had an error: %t", tc.error, err != nil)
}
if version != tc.preferredVersion {
t.Errorf("expected %v, got: %v", tc.preferredVersion, version)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -146,12 +147,17 @@ func BuildClusterAPI(opts config.AutoscalingOptions, do cloudprovider.NodeGroupD
klog.Fatalf("could not generate dynamic client for config")
}

kubeclient, err := kubernetes.NewForConfig(externalConfig)
kubeClient, err := kubernetes.NewForConfig(externalConfig)
if err != nil {
klog.Fatalf("create kube clientset failed: %v", err)
}

controller, err := newMachineController(dc, kubeclient)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(externalConfig)
if err != nil {
klog.Fatalf("create discovery client failed: %v", err)
}

controller, err := newMachineController(dc, kubeClient, discoveryClient)
if err != nil {
klog.Fatal(err)
}
Expand Down