Skip to content

Commit

Permalink
Add the ability to override CAPI group via env variable and discover …
Browse files Browse the repository at this point in the history
…API version.

This change adds detection for an environment variable to specify the group for the clusterapi resources. If the environment
variable `CAPI_GROUP` is specified, then it will
be used instead of the default.
This also decouples the API group from the version and let the latter to be discovered dynamically.
  • Loading branch information
elmiko authored and xmudrii committed Apr 30, 2020
1 parent 9f9e6ef commit 4e108b7
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package clusterapi
import (
"context"
"fmt"
"os"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
Expand All @@ -38,7 +40,9 @@ import (
const (
machineProviderIDIndex = "machineProviderIDIndex"
nodeProviderIDIndex = "nodeProviderIDIndex"
defaultMachineAPI = "v1alpha2.cluster.x-k8s.io"
defaultCAPIGroup = "cluster.x-k8s.io"
// CAPIGroupEnvVar contains the environment variable name which allows overriding defaultCAPIGroup.
CAPIGroupEnvVar = "CAPI_GROUP"
)

// machineController watches for Nodes, Machines, MachineSets and
Expand Down Expand Up @@ -271,37 +275,58 @@ func (c *machineController) machinesInMachineSet(machineSet *MachineSet) ([]*Mac
return result, nil
}

// getCAPIGroup returns a string that specifies the group for the API.
// It will return either the value from the
// CAPI_GROUP environment variable, or the default value i.e cluster.x-k8s.io.
func getCAPIGroup() string {
g := os.Getenv(CAPIGroupEnvVar)
if g == "" {
g = defaultCAPIGroup
}
klog.V(4).Infof("Using API Group %q", g)
return g
}

// newMachineController constructs a controller that watches Nodes,
// Machines and MachineSet as they are added, updated and deleted on
// the cluster.
func newMachineController(
dynamicclient dynamic.Interface,
kubeclient kubeclient.Interface,
discoveryclient discovery.DiscoveryInterface,
) (*machineController, error) {
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeclient, 0)
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicclient, 0, metav1.NamespaceAll, nil)

// TODO(alberto): let environment variable to override defaultMachineAPI
machineDeploymentResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinedeployments.%v", defaultMachineAPI))
CAPIGroup := getCAPIGroup()
CAPIVersion, err := getAPIGroupPreferredVersion(discoveryclient, CAPIGroup)
if err != nil {
panic("CAPIVersion")
}
klog.Infof("Using version %q for API group %q", CAPIVersion, CAPIGroup)

machineDeploymentResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinedeployments.%v.%v", CAPIVersion, CAPIGroup))
if machineDeploymentResource == nil {
panic("MachineDeployment")
}

machineSetResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinesets.%v", defaultMachineAPI))
machineSetResource, _ := schema.ParseResourceArg(fmt.Sprintf("machinesets.%v.%v", CAPIVersion, CAPIGroup))
if machineSetResource == nil {
panic("MachineSetResource")
}

machineResource, _ := schema.ParseResourceArg(fmt.Sprintf("machines.%v", defaultMachineAPI))
machineResource, _ := schema.ParseResourceArg(fmt.Sprintf("machines.%v.%v", CAPIVersion, CAPIGroup))
if machineResource == nil {
panic("machineResource")
}

machineInformer := informerFactory.ForResource(*machineResource)
machineSetInformer := informerFactory.ForResource(*machineSetResource)
var machineDeploymentInformer informers.GenericInformer

machineDeploymentInformer = informerFactory.ForResource(*machineDeploymentResource)
machineDeploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
machineDeploymentInformer := informerFactory.ForResource(*machineDeploymentResource)

machineInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
machineSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
machineDeploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})

nodeInformer := kubeInformerFactory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{})
Expand Down Expand Up @@ -332,6 +357,21 @@ func newMachineController(
}, nil
}

func getAPIGroupPreferredVersion(client discovery.DiscoveryInterface, APIGroup string) (string, error) {
groupList, err := client.ServerGroups()
if err != nil {
return "", fmt.Errorf("failed to get ServerGroups: %v", err)
}

for _, group := range groupList.Groups {
if group.Name == APIGroup {
return group.PreferredVersion.Version, nil
}
}

return "", fmt.Errorf("failed to find API group %q", APIGroup)
}

func (c *machineController) machineSetProviderIDs(machineSet *MachineSet) ([]string, error) {
machines, err := c.machinesInMachineSet(machineSet)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package clusterapi

import (
"fmt"
"os"
"path"
"reflect"
"sort"
Expand All @@ -29,8 +30,10 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
fakediscovery "k8s.io/client-go/discovery/fake"
fakedynamic "k8s.io/client-go/dynamic/fake"
fakekube "k8s.io/client-go/kubernetes/fake"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
)

Expand All @@ -53,6 +56,8 @@ type testSpec struct {
rootIsMachineDeployment bool
}

const customCAPIGroup = "custom.x-k8s.io"

func mustCreateTestController(t *testing.T, testConfigs ...*testConfig) (*machineController, testControllerShutdownFunc) {
t.Helper()

Expand All @@ -76,7 +81,19 @@ func mustCreateTestController(t *testing.T, testConfigs ...*testConfig) (*machin

kubeclientSet := fakekube.NewSimpleClientset(nodeObjects...)
dynamicClientset := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), machineObjects...)
controller, err := newMachineController(dynamicClientset, kubeclientSet)
discoveryClient := &fakediscovery.FakeDiscovery{
Fake: &clientgotesting.Fake{
Resources: []*v1.APIResourceList{
{
GroupVersion: fmt.Sprintf("%s/v1beta1", customCAPIGroup),
},
{
GroupVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
},
},
},
}
controller, err := newMachineController(dynamicClientset, kubeclientSet, discoveryClient)
if err != nil {
t.Fatal("failed to create test controller")
}
Expand Down Expand Up @@ -136,7 +153,7 @@ func createTestConfigs(specs ...testSpec) []*testConfig {

config.machineSet = &MachineSet{
TypeMeta: v1.TypeMeta{
APIVersion: "cluster.x-k8s.io/v1alpha2",
APIVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
Kind: "MachineSet",
},
ObjectMeta: v1.ObjectMeta{
Expand All @@ -152,7 +169,7 @@ func createTestConfigs(specs ...testSpec) []*testConfig {
} else {
config.machineDeployment = &MachineDeployment{
TypeMeta: v1.TypeMeta{
APIVersion: "cluster.x-k8s.io/v1alpha2",
APIVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
Kind: "MachineDeployment",
},
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -211,7 +228,7 @@ func makeLinkedNodeAndMachine(i int, namespace string, owner v1.OwnerReference)

machine := &Machine{
TypeMeta: v1.TypeMeta{
APIVersion: "cluster.x-k8s.io/v1alpha2",
APIVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
Kind: "Machine",
},
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -991,3 +1008,121 @@ func TestControllerMachineSetNodeNamesUsingStatusNodeRefName(t *testing.T) {
}
}
}

func TestControllerGetAPIVersionGroup(t *testing.T) {
expected := "mygroup"
if err := os.Setenv(CAPIGroupEnvVar, expected); err != nil {
t.Fatalf("unexpected error: %v", err)
}
observed := getCAPIGroup()
if observed != expected {
t.Fatalf("Wrong Version Group detected, expected %q, got %q", expected, observed)
}

expected = defaultCAPIGroup
if err := os.Setenv(CAPIGroupEnvVar, ""); err != nil {
t.Fatalf("unexpected error: %v", err)
}
observed = getCAPIGroup()
if observed != expected {
t.Fatalf("Wrong Version Group detected, expected %q, got %q", expected, observed)
}
}

func TestControllerGetAPIVersionGroupWithMachineDeployments(t *testing.T) {
testConfig := createMachineDeploymentTestConfig(testNamespace, 1, map[string]string{
nodeGroupMinSizeAnnotationKey: "1",
nodeGroupMaxSizeAnnotationKey: "1",
})
if err := os.Setenv(CAPIGroupEnvVar, customCAPIGroup); err != nil {
t.Fatalf("unexpected error: %v", err)
}

testConfig.machineDeployment.TypeMeta.APIVersion = fmt.Sprintf("%s/v1beta1", customCAPIGroup)
testConfig.machineSet.TypeMeta.APIVersion = fmt.Sprintf("%s/v1beta1", customCAPIGroup)
for _, machine := range testConfig.machines {
machine.TypeMeta.APIVersion = fmt.Sprintf("%s/v1beta1", customCAPIGroup)
}
controller, stop := mustCreateTestController(t, testConfig)
defer stop()

machineDeployments, err := controller.listMachineDeployments(testNamespace, labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if l := len(machineDeployments); l != 1 {
t.Fatalf("Incorrect number of MachineDeployments, expected 1, got %d", l)
}

machineSets, err := controller.listMachineSets(testNamespace, labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if l := len(machineSets); l != 1 {
t.Fatalf("Incorrect number of MachineSets, expected 1, got %d", l)
}

machines, err := controller.listMachines(testNamespace, labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if l := len(machines); l != 1 {
t.Fatalf("Incorrect number of Machines, expected 1, got %d", l)
}

if err := os.Unsetenv(CAPIGroupEnvVar); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

func TestGetAPIGroupPreferredVersion(t *testing.T) {
testCases := []struct {
description string
APIGroup string
preferredVersion string
error bool
}{
{
description: "find version for default API group",
APIGroup: defaultCAPIGroup,
preferredVersion: "v1alpha3",
error: false,
},
{
description: "find version for another API group",
APIGroup: customCAPIGroup,
preferredVersion: "v1beta1",
error: false,
},
{
description: "API group does not exist",
APIGroup: "does.not.exist",
preferredVersion: "",
error: true,
},
}

discoveryClient := &fakediscovery.FakeDiscovery{
Fake: &clientgotesting.Fake{
Resources: []*v1.APIResourceList{
{
GroupVersion: fmt.Sprintf("%s/v1beta1", customCAPIGroup),
},
{
GroupVersion: fmt.Sprintf("%s/v1alpha3", defaultCAPIGroup),
},
},
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
version, err := getAPIGroupPreferredVersion(discoveryClient, tc.APIGroup)
if (err != nil) != tc.error {
t.Errorf("expected to have error: %t. Had an error: %t", tc.error, err != nil)
}
if version != tc.preferredVersion {
t.Errorf("expected %v, got: %v", tc.preferredVersion, version)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -146,12 +147,17 @@ func BuildClusterAPI(opts config.AutoscalingOptions, do cloudprovider.NodeGroupD
klog.Fatalf("could not generate dynamic client for config")
}

kubeclient, err := kubernetes.NewForConfig(externalConfig)
kubeClient, err := kubernetes.NewForConfig(externalConfig)
if err != nil {
klog.Fatalf("create kube clientset failed: %v", err)
}

controller, err := newMachineController(dc, kubeclient)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(externalConfig)
if err != nil {
klog.Fatalf("create discovery client failed: %v", err)
}

controller, err := newMachineController(dc, kubeClient, discoveryClient)
if err != nil {
klog.Fatal(err)
}
Expand Down

0 comments on commit 4e108b7

Please sign in to comment.