Skip to content

Commit

Permalink
Get Topology Manager policy from configz-endpoint
Browse files Browse the repository at this point in the history
This patch allows nfd-master to pull the config from the Kubernetes configz endpoint.
This update introduces --obtain-kubelet-config=<option> as it provides the option to obtain Kubelet config from configz-endpoint in addition to the already existing way of obtaining kubelet-config-file.

Co-authored-by: Killian Muldoon <[email protected]>
Signed-off-by: Killian Muldoon <[email protected]>
Signed-off-by: Talor Itzhak <[email protected]>
  • Loading branch information
Tal-or and Killian Muldoon committed Oct 6, 2021
1 parent d500d2c commit 18088e7
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 20 deletions.
32 changes: 18 additions & 14 deletions cmd/nfd-topology-updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (
ProgramName = "nfd-topology-updater"
)

// Options to obtain Kubelet config file
const (
kubeletFile = "kubelet-config-file"
configz = "configz-endpoint"
)

func main() {
flags := flag.NewFlagSet(ProgramName, flag.ExitOnError)

Expand All @@ -58,12 +64,15 @@ func main() {
// Plug klog into grpc logging infrastructure
utils.ConfigureGrpcKlog()

klConfig, err := kubeconf.GetKubeletConfigFromLocalFile(resourcemonitorArgs.KubeletConfigFile)
if err != nil {
klog.Fatalf("error reading kubelet config: %v", err)
}
tmPolicy := string(topologypolicy.DetectTopologyPolicy(klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope))
klog.Infof("detected kubelet Topology Manager policy %q", tmPolicy)
var tmPolicy string
if resourcemonitorArgs.KubeletConfigObtainOpt == kubeletFile {
klConfig, err := kubeconf.GetKubeletConfigFromLocalFile(resourcemonitorArgs.KubeletConfigFile)
if err != nil {
klog.Fatalf("error reading kubelet config: %v", err)
}
tmPolicy = string(topologypolicy.DetectTopologyPolicy(klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope))
klog.Infof("detected kubelet Topology Manager policy %q", tmPolicy)
} // Otherwise get TopologyManagerPolicy from configz-endpoint

// Get new TopologyUpdater instance
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, tmPolicy)
Expand Down Expand Up @@ -109,6 +118,7 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
"Time to sleep between CR updates. Non-positive value implies no CR updatation (i.e. infinite sleep). [Default: 60s]")
flagset.StringVar(&resourcemonitorArgs.Namespace, "watch-namespace", "*",
"Namespace to watch pods (for testing/debugging purpose). Use * for all namespaces.")
flagset.StringVar(&resourcemonitorArgs.KubeletConfigObtainOpt, "obtain-kubelet-config", kubeletFile, "Options to obtain Kubelet config file. [Possible arguments: kubelet-config-file/configz-endpoint]")
flagset.StringVar(&resourcemonitorArgs.KubeletConfigFile, "kubelet-config-file", source.VarDir.Path("lib/kubelet/config.yaml"),
"Kubelet config file path.")
flagset.StringVar(&resourcemonitorArgs.PodResourceSocketPath, "podresources-socket", source.VarDir.Path("lib/kubelet/pod-resources/kubelet.sock"),
Expand All @@ -118,13 +128,7 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
flagset.StringVar(&args.ServerNameOverride, "server-name-override", "",
"Hostname expected from server certificate, useful in testing")

initKlogFlags(flagset)
klog.InitFlags(flagset)

return args, resourcemonitorArgs
}

func initKlogFlags(flagset *flag.FlagSet) {
flags := flag.NewFlagSet("klog flags", flag.ExitOnError)
//flags.SetOutput(ioutil.Discard)
klog.InitFlags(flags)
}
}
10 changes: 10 additions & 0 deletions cmd/nfd-topology-updater/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestArgsParse(t *testing.T) {
So(args.Oneshot, ShouldBeTrue)
So(finderArgs.SleepInterval, ShouldEqual, 60*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/var/lib/kubelet/config.yaml")
So(finderArgs.KubeletConfigObtainOpt, ShouldEqual, kubeletFile)
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock")
})
})
Expand Down Expand Up @@ -100,5 +101,14 @@ func TestArgsParse(t *testing.T) {
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/path/testkubelet.sock")
})
})

Convey("When configz-endpoint specified", func() {
_, finderArgs := parseArgs(flags,
"--obtain-kubelet-config=configz-endpoint")

Convey("obtain-kubelet-config option should be configz-endpoint", func() {
So(finderArgs.KubeletConfigObtainOpt, ShouldEqual, configz)
})
})
})
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/klauspost/cpuid/v2 v2.0.9
github.com/onsi/ginkgo v1.14.0
github.com/onsi/gomega v1.10.1
github.com/pkg/errors v0.9.1
github.com/smartystreets/assertions v1.2.0
github.com/smartystreets/goconvey v1.6.4
github.com/stretchr/testify v1.7.0
Expand Down
6 changes: 5 additions & 1 deletion pkg/apihelper/apihelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
api "k8s.io/api/core/v1"
k8sclient "k8s.io/client-go/kubernetes"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
)

// APIHelpers represents a set of API helpers for Kubernetes
Expand All @@ -47,6 +48,9 @@ type APIHelpers interface {
// GetTopologyClient returns a topologyclientset
GetTopologyClient() (*topologyclientset.Clientset, error)

// GetPod returns the Kubernetes pod in a namepace with a name.
// GetPod returns the Kubernetes pod in a namespace with a name.
GetPod(*k8sclient.Clientset, string, string) (*api.Pod, error)

// GetKubeletConfig returns node's kubelet config file
GetKubeletConfig(c *k8sclient.Clientset, nodeName string) (*kubeletconfig.KubeletConfiguration, error)
}
46 changes: 45 additions & 1 deletion pkg/apihelper/k8shelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ import (
"context"
"encoding/json"

topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
api "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
k8sclient "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletconfigscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"

topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
"github.com/pkg/errors"
)

// Implements APIHelpers
Expand Down Expand Up @@ -132,3 +137,42 @@ func (h K8sHelpers) GetPod(cli *k8sclient.Clientset, namespace string, podName s

return pod, nil
}

func (h K8sHelpers) GetKubeletConfig(c *k8sclient.Clientset, nodeName string) (*kubeletconfig.KubeletConfiguration, error) {
//request here is the same as: curl https://$APISERVER:6443/api/v1/nodes/$NODE_NAME/proxy/configz
res := c.CoreV1().RESTClient().Get().Resource("nodes").Name(nodeName).SubResource("proxy").Suffix("configz").Do(context.TODO())
return decodeConfigz(&res)
}

// Decodes the rest result from /configz and returns a kubeletconfig.KubeletConfiguration (internal type).
func decodeConfigz(res *restclient.Result) (*kubeletconfig.KubeletConfiguration, error) {
// This hack because /configz reports the following structure:
// {"kubeletconfig": {the JSON representation of kubeletconfigv1beta1.KubeletConfiguration}}
type configzWrapper struct {
ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"`
}

configz := configzWrapper{}
kubeCfg := kubeletconfig.KubeletConfiguration{}

contentsBytes, err := res.Raw()
if err != nil {
return nil, err
}

err = json.Unmarshal(contentsBytes, &configz)
if err != nil {
return nil, errors.Wrap(err, string(contentsBytes))
}

scheme, _, err := kubeletconfigscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
err = scheme.Convert(&configz.ComponentConfig, &kubeCfg, nil)
if err != nil {
return nil, err
}

return &kubeCfg, nil
}
24 changes: 24 additions & 0 deletions pkg/apihelper/mock_APIHelpers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions pkg/nfd-master/nfd-master.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,21 @@ func (m *nfdMaster) UpdateNodeTopology(c context.Context, r *topologypb.NodeTopo
} else {
klog.Infof("received CR updation request for node %q", r.NodeName)
}
if len(r.TopologyPolicies[0]) == 0 {
klog.Warningf("Using configz-endpoint in order to get Kubelet configuration, consider to be unstable")
cli, err := m.apihelper.GetClient()
if err != nil {
klog.Errorf("%v", err.Error())
return &topologypb.NodeTopologyResponse{}, err
}
kc, err := m.apihelper.GetKubeletConfig(cli, r.NodeName)
if err != nil {
klog.Errorf("failed to get Kubelet config: %v", err.Error())
return &topologypb.NodeTopologyResponse{}, err
}
r.TopologyPolicies[0] = kc.TopologyManagerPolicy
klog.Infof("detected topology policy: %q", kc.TopologyManagerPolicy)
}
if !m.args.NoPublish {
err := m.updateCR(r.NodeName, r.TopologyPolicies, r.Zones, m.args.NRTNamespace)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/resourcemonitor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (

// Args stores commandline arguments used for resource monitoring
type Args struct {
PodResourceSocketPath string
SleepInterval time.Duration
Namespace string
KubeletConfigFile string
PodResourceSocketPath string
SleepInterval time.Duration
Namespace string
KubeletConfigFile string
KubeletConfigObtainOpt string
}

// ResourceInfo stores information of resources and their corresponding IDs obtained from PodResource API
Expand Down

0 comments on commit 18088e7

Please sign in to comment.