Skip to content

Commit

Permalink
Merge pull request kubernetes#2934 from enxebre/fix-2932
Browse files Browse the repository at this point in the history
Fixes 2932: let the capi version to be discovered
  • Loading branch information
k8s-ci-robot authored Mar 16, 2020
2 parents a469c85 + 7082cfe commit ee627f2
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package clusterapi
import (
"context"
"fmt"
"os"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
Expand All @@ -38,7 +40,9 @@ import (
const (
machineProviderIDIndex = "machineProviderIDIndex"
nodeProviderIDIndex = "nodeProviderIDIndex"
defaultMachineAPI = "v1alpha2.cluster.x-k8s.io"
defaultCAPIGroup = "cluster.x-k8s.io"
// CAPIGroupEnvVar contains the environment variable name which allows overriding defaultCAPIGroup.
CAPIGroupEnvVar = "CAPI_GROUP"
)

// machineController watches for Nodes, Machines, MachineSets and
Expand Down Expand Up @@ -271,37 +275,58 @@ func (c *machineController) machinesInMachineSet(machineSet *MachineSet) ([]*Mac
return result, nil
}

// getCAPIGroup returns a string that specifies the group for the API.
// It will return either the value from the
// CAPI_GROUP environment variable, or the default value i.e cluster.x-k8s.io.
func getCAPIGroup() string {
g := os.Getenv(CAPIGroupEnvVar)
if g == "" {
g = defaultCAPIGroup
}
klog.V(4).Infof("Using API Group %q", g)
return g
}

// newMachineController constructs a controller that watches Nodes,
// Machines and MachineSet as they are added, updated and deleted on
// the cluster.
func newMachineController(
dynamicclient dynamic.Interface,
kubeclient kubeclient.Interface,
discoveryclient discovery.DiscoveryInterface,
) (*machineController, error) {
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeclient, 0)
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicclient, 0, metav1.NamespaceAll, nil)

// TODO(alberto): let environment variable to override defaultMachineAPI
machineDeploymentResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinedeployments.%v", defaultMachineAPI))
CAPIGroup := getCAPIGroup()
CAPIVersion, err := getAPIGroupPreferredVersion(discoveryclient, CAPIGroup)
if err != nil {
panic("CAPIVersion")
}
klog.Infof("Using version %q for API group %q", CAPIVersion, CAPIGroup)

machineDeploymentResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinedeployments.%v.%v", CAPIVersion, CAPIGroup))
if machineDeploymentResource == nil {
panic("MachineDeployment")
}

machineSetResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinesets.%v", defaultMachineAPI))
machineSetResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinesets.%v.%v", CAPIVersion, CAPIGroup))
if machineSetResource == nil {
panic("MachineSetResource")
}

machineResource, _ := schema.ParseResourceArg(fmt.Sprintf("machines.%v", defaultMachineAPI))
machineResource, _ := schema.ParseResourceArg(fmt.Sprintf("machines.%v.%v", CAPIVersion, CAPIGroup))
if machineResource == nil {
panic("machineResource")
}

machineInformer := informerFactory.ForResource(*machineResource)
machineSetInformer := informerFactory.ForResource(*machineSetResource)
var machineDeploymentInformer informers.GenericInformer

machineDeploymentInformer = informerFactory.ForResource(*machineDeploymentResource)
machineDeploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
machineDeploymentInformer := informerFactory.ForResource(*machineDeploymentResource)

machineInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
machineSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
machineDeploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})

nodeInformer := kubeInformerFactory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{})
Expand Down Expand Up @@ -332,6 +357,21 @@ func newMachineController(
}, nil
}

func getAPIGroupPreferredVersion(client discovery.DiscoveryInterface, APIGroup string) (string, error) {
groupList, err := client.ServerGroups()
if err != nil {
return "", fmt.Errorf("failed to get ServerGroups: %v", err)
}

for _, group := range groupList.Groups {
if group.Name == APIGroup {
return group.PreferredVersion.Version, nil
}
}

return "", fmt.Errorf("failed to find API group %q", APIGroup)
}

func (c *machineController) machineSetProviderIDs(machineSet *MachineSet) ([]string, error) {
machines, err := c.machinesInMachineSet(machineSet)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package clusterapi

import (
"fmt"
"os"
"path"
"reflect"
"sort"
Expand All @@ -29,8 +30,10 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
fakediscovery "k8s.io/client-go/discovery/fake"
fakedynamic "k8s.io/client-go/dynamic/fake"
fakekube "k8s.io/client-go/kubernetes/fake"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
)

Expand All @@ -53,6 +56,8 @@ type testSpec struct {
rootIsMachineDeployment bool
}

const customCAPIGroup = "custom.x-k8s.io"

func mustCreateTestController(t *testing.T, testConfigs ...*testConfig) (*machineController, testControllerShutdownFunc) {
t.Helper()

Expand All @@ -76,7 +81,19 @@ func mustCreateTestController(t *testing.T, testConfigs ...*testConfig) (*machin

kubeclientSet := fakekube.NewSimpleClientset(nodeObjects...)
dynamicClientset := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), machineObjects...)
controller, err := newMachineController(dynamicClientset, kubeclientSet)
discoveryClient := &fakediscovery.FakeDiscovery{
Fake: &clientgotesting.Fake{
Resources: []*v1.APIResourceList{
{
GroupVersion: fmt.Sprintf("%s/v1beta1", customCAPIGroup),
},
{
GroupVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
},
},
},
}
controller, err := newMachineController(dynamicClientset, kubeclientSet, discoveryClient)
if err != nil {
t.Fatal("failed to create test controller")
}
Expand Down Expand Up @@ -136,7 +153,7 @@ func createTestConfigs(specs ...testSpec) []*testConfig {

config.machineSet = &MachineSet{
TypeMeta: v1.TypeMeta{
APIVersion: "cluster.x-k8s.io/v1alpha2",
APIVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
Kind: "MachineSet",
},
ObjectMeta: v1.ObjectMeta{
Expand All @@ -152,7 +169,7 @@ func createTestConfigs(specs ...testSpec) []*testConfig {
} else {
config.machineDeployment = &MachineDeployment{
TypeMeta: v1.TypeMeta{
APIVersion: "cluster.x-k8s.io/v1alpha2",
APIVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
Kind: "MachineDeployment",
},
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -211,7 +228,7 @@ func makeLinkedNodeAndMachine(i int, namespace string, owner v1.OwnerReference)

machine := &Machine{
TypeMeta: v1.TypeMeta{
APIVersion: "cluster.x-k8s.io/v1alpha2",
APIVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
Kind: "Machine",
},
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -991,3 +1008,121 @@ func TestControllerMachineSetNodeNamesUsingStatusNodeRefName(t *testing.T) {
}
}
}

func TestControllerGetAPIVersionGroup(t *testing.T) {
expected := "mygroup"
if err := os.Setenv(CAPIGroupEnvVar, expected); err != nil {
t.Fatalf("unexpected error: %v", err)
}
observed := getCAPIGroup()
if observed != expected {
t.Fatalf("Wrong Version Group detected, expected %q, got %q", expected, observed)
}

expected = defaultCAPIGroup
if err := os.Setenv(CAPIGroupEnvVar, ""); err != nil {
t.Fatalf("unexpected error: %v", err)
}
observed = getCAPIGroup()
if observed != expected {
t.Fatalf("Wrong Version Group detected, expected %q, got %q", expected, observed)
}
}

func TestControllerGetAPIVersionGroupWithMachineDeployments(t *testing.T) {
testConfig := createMachineDeploymentTestConfig(testNamespace, 1, map[string]string{
nodeGroupMinSizeAnnotationKey: "1",
nodeGroupMaxSizeAnnotationKey: "1",
})
if err := os.Setenv(CAPIGroupEnvVar, customCAPIGroup); err != nil {
t.Fatalf("unexpected error: %v", err)
}

testConfig.machineDeployment.TypeMeta.APIVersion = fmt.Sprintf("%s/v1beta1", customCAPIGroup)
testConfig.machineSet.TypeMeta.APIVersion = fmt.Sprintf("%s/v1beta1", customCAPIGroup)
for _, machine := range testConfig.machines {
machine.TypeMeta.APIVersion = fmt.Sprintf("%s/v1beta1", customCAPIGroup)
}
controller, stop := mustCreateTestController(t, testConfig)
defer stop()

machineDeployments, err := controller.listMachineDeployments(testNamespace, labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if l := len(machineDeployments); l != 1 {
t.Fatalf("Incorrect number of MachineDeployments, expected 1, got %d", l)
}

machineSets, err := controller.listMachineSets(testNamespace, labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if l := len(machineSets); l != 1 {
t.Fatalf("Incorrect number of MachineSets, expected 1, got %d", l)
}

machines, err := controller.listMachines(testNamespace, labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if l := len(machines); l != 1 {
t.Fatalf("Incorrect number of Machines, expected 1, got %d", l)
}

if err := os.Unsetenv(CAPIGroupEnvVar); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

func TestGetAPIGroupPreferredVersion(t *testing.T) {
testCases := []struct {
description string
APIGroup string
preferredVersion string
error bool
}{
{
description: "find version for default API group",
APIGroup: defaultCAPIGroup,
preferredVersion: "v1alpha3",
error: false,
},
{
description: "find version for another API group",
APIGroup: customCAPIGroup,
preferredVersion: "v1beta1",
error: false,
},
{
description: "API group does not exist",
APIGroup: "does.not.exist",
preferredVersion: "",
error: true,
},
}

discoveryClient := &fakediscovery.FakeDiscovery{
Fake: &clientgotesting.Fake{
Resources: []*v1.APIResourceList{
{
GroupVersion: fmt.Sprintf("%s/v1beta1", customCAPIGroup),
},
{
GroupVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
},
},
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
version, err := getAPIGroupPreferredVersion(discoveryClient, tc.APIGroup)
if (err != nil) != tc.error {
t.Errorf("expected to have error: %t. Had an error: %t", tc.error, err != nil)
}
if version != tc.preferredVersion {
t.Errorf("expected %v, got: %v", tc.preferredVersion, version)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -146,12 +147,17 @@ func BuildClusterAPI(opts config.AutoscalingOptions, do cloudprovider.NodeGroupD
klog.Fatalf("could not generate dynamic client for config")
}

kubeclient, err := kubernetes.NewForConfig(externalConfig)
kubeClient, err := kubernetes.NewForConfig(externalConfig)
if err != nil {
klog.Fatalf("create kube clientset failed: %v", err)
}

controller, err := newMachineController(dc, kubeclient)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(externalConfig)
if err != nil {
klog.Fatalf("create discovery client failed: %v", err)
}

controller, err := newMachineController(dc, kubeClient, discoveryClient)
if err != nil {
klog.Fatal(err)
}
Expand Down

0 comments on commit ee627f2

Please sign in to comment.