Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
scheduler: pass only the node names, fix documentation
Browse files Browse the repository at this point in the history
With NodeCacheCapable=true in the Extender configuration, just the
node names are passed as arguments and expected in the results. As we
don't need more than that, that mode is better because it is more
efficient. Logging gets streamlined for that mode.

The install instructions lacked documentation for Kubernetes 1.19 and
for setting up /var/lib/scheduler/scheduler-config.yaml.
  • Loading branch information
pohly committed Sep 25, 2020
1 parent 1790c14 commit c68423d
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 30 deletions.
64 changes: 62 additions & 2 deletions docs/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -710,12 +710,13 @@ trust any root CA. The following kubeadm config file solves
this together with enabling the scheduler configuration by
bind-mounting the root certificate that was used to sign the certificate used
by the scheduler extender into the location where the Go
runtime will find it:
runtime will find it. It works for Kubernetes <= 1.18:

``` ShellSession
$ sudo mkdir -p /var/lib/scheduler/
$ sudo cp _work/pmem-ca/ca.pem /var/lib/scheduler/ca.crt

# https://github.com/kubernetes/kubernetes/blob/52d7614a8ca5b8aebc45333b6dc8fbf86a5e7ddf/staging/src/k8s.io/kube-scheduler/config/v1alpha1/types.go#L38-L107
$ sudo sh -c 'cat >/var/lib/scheduler/scheduler-policy.cfg' <<EOF
{
"kind" : "Policy",
Expand All @@ -725,7 +726,7 @@ $ sudo sh -c 'cat >/var/lib/scheduler/scheduler-policy.cfg' <<EOF
"urlPrefix": "https://<service name or IP>:<port>",
"filterVerb": "filter",
"prioritizeVerb": "prioritize",
"nodeCacheCapable": false,
"nodeCacheCapable": true,
"weight": 1,
"managedResources":
[{
Expand All @@ -736,6 +737,65 @@ $ sudo sh -c 'cat >/var/lib/scheduler/scheduler-policy.cfg' <<EOF
}
EOF

# https://github.com/kubernetes/kubernetes/blob/52d7614a8ca5b8aebc45333b6dc8fbf86a5e7ddf/staging/src/k8s.io/kube-scheduler/config/v1alpha1/types.go#L38-L107
$ sudo sh -c 'cat >/var/lib/scheduler/scheduler-config.yaml' <<EOF
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: default-scheduler
algorithmSource:
policy:
file:
path: /var/lib/scheduler/scheduler-policy.cfg
clientConnection:
# This is where kubeadm puts it.
kubeconfig: /etc/kubernetes/scheduler.conf
EOF

$ cat >kubeadm.config <<EOF
apiVersion: kubeadm.k8s.io/v1beta1
kind: ClusterConfiguration
scheduler:
extraVolumes:
- name: config
hostPath: /var/lib/scheduler
mountPath: /var/lib/scheduler
readOnly: true
- name: cluster-root-ca
hostPath: /var/lib/scheduler/ca.crt
mountPath: /etc/ssl/certs/ca.crt
readOnly: true
extraArgs:
config: /var/lib/scheduler/scheduler-config.yaml
EOF

$ kubeadm init --config=kubeadm.config
```

In Kubernetes 1.19, the configuration API of the scheduler
changed. The corresponding command for Kubernetes >= 1.19 are:

``` ShellSession
$ sudo mkdir -p /var/lib/scheduler/
$ sudo cp _work/pmem-ca/ca.pem /var/lib/scheduler/ca.crt

# https://github.com/kubernetes/kubernetes/blob/1afc53514032a44d091ae4a9f6e092171db9fe10/staging/src/k8s.io/kube-scheduler/config/v1beta1/types.go#L44-L96
$ sudo sh -c 'cat >/var/lib/scheduler/scheduler-config.yaml' <<EOF
apiVersion: kubescheduler.config.k8s.io/v1beta1
kind: KubeSchedulerConfiguration
clientConnection:
# This is where kubeadm puts it.
kubeconfig: /etc/kubernetes/scheduler.conf
extenders:
- urlPrefix: https://127.0.0.1:<service name or IP>:<port>
filterVerb: filter
prioritizeVerb: prioritize
nodeCacheCapable: true
weight: 1
managedResources:
- name: pmem-csi.intel.com/scheduler
ignoredByScheduler: true
EOF

$ cat >kubeadm.config <<EOF
apiVersion: kubeadm.k8s.io/v1beta1
kind: ClusterConfiguration
Expand Down
67 changes: 45 additions & 22 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (s *scheduler) filter(w http.ResponseWriter, r *http.Request) {
s.log.Error(err, "JSON encoding")
w.WriteHeader(http.StatusInternalServerError)
} else {
s.log.V(5).Info("node filter", "result", string(response))
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(response)
Expand All @@ -140,16 +139,16 @@ func (s *scheduler) filter(w http.ResponseWriter, r *http.Request) {
// complicated to implement and should better be handled generically for volumes
// in Kubernetes.
func (s *scheduler) doFilter(args schedulerapi.ExtenderArgs) (*schedulerapi.ExtenderFilterResult, error) {
var filteredNodes []v1.Node
var filteredNodes []string
failedNodes := make(schedulerapi.FailedNodesMap)
if args.Pod == nil ||
args.Pod.Name == "" ||
args.Nodes == nil {
(args.NodeNames == nil && args.Nodes == nil) {
return nil, errors.New("incomplete parameters")
}

log := s.log.WithValues("pod", args.Pod.Name)
log.V(5).Info("node filter request", "potential nodes", nodeNames(args.Nodes.Items))
log.V(5).Info("node filter", "request", args)
required, err := s.requiredStorage(args.Pod)
if err != nil {
return nil, fmt.Errorf("checking for unbound volumes: %v", err)
Expand All @@ -158,43 +157,58 @@ func (s *scheduler) doFilter(args schedulerapi.ExtenderArgs) (*schedulerapi.Exte

var mutex sync.Mutex
var waitgroup sync.WaitGroup
for _, node := range args.Nodes.Items {
var nodeNames []string
if args.NodeNames != nil {
nodeNames = *args.NodeNames
} else {
// Fallback for Extender.NodeCacheCapable == false:
// not recommended, but may still be used by users who followed the
// PMEM-CSI 0.7 setup instructions.
log.Info("NodeCacheCapable is false in Extender configuration, should be set to true.")
nodeNames = listNodeNames(args.Nodes.Items)
}
for _, nodeName := range nodeNames {
if required == 0 {
// Nothing to check.
filteredNodes = append(filteredNodes, node)
filteredNodes = append(filteredNodes, nodeName)
continue
}

// Check in parallel.
node := node
nodeName := nodeName
waitgroup.Add(1)
go func() {
fits, failReasons, err := s.nodeHasEnoughCapacity(required, node)
fits, failReasons, err := s.nodeHasEnoughCapacity(required, nodeName)
mutex.Lock()
defer mutex.Unlock()
defer waitgroup.Done()
switch {
case fits:
filteredNodes = append(filteredNodes, node)
filteredNodes = append(filteredNodes, nodeName)
case failReasons != nil:
failedNodes[node.Name] = strings.Join(failReasons, ",")
failedNodes[nodeName] = strings.Join(failReasons, ",")
case err != nil:
failedNodes[node.Name] = fmt.Sprintf("checking for capacity: %v", err)
failedNodes[nodeName] = fmt.Sprintf("checking for capacity: %v", err)
}
}()
}
waitgroup.Wait()

log.V(5).Info("node filter result",
"suitable nodes", nodeNames(filteredNodes),
"failed nodes", failedNodes)
return &schedulerapi.ExtenderFilterResult{
Nodes: &v1.NodeList{
Items: filteredNodes,
},
response := &schedulerapi.ExtenderFilterResult{
FailedNodes: failedNodes,
Error: "",
}, nil
}
if args.NodeNames != nil {
response.NodeNames = &filteredNodes
} else {
// fallback response...
response.Nodes = &v1.NodeList{}
for _, node := range filteredNodes {
response.Nodes.Items = append(response.Nodes.Items, getNode(args.Nodes.Items, node))
}
}
log.V(5).Info("node filter", "response", response)
return response, nil
}

// requiredStorage sums up total size of all currently unbound
Expand Down Expand Up @@ -262,8 +276,8 @@ func (s *scheduler) requiredStorage(pod *v1.Pod) (int64, error) {

// nodeHasEnoughCapacity determines whether a node has enough storage available. It either returns
// true if yes, a list of explanations why not, or an error if checking failed.
func (s *scheduler) nodeHasEnoughCapacity(required int64, node v1.Node) (bool, []string, error) {
available, err := s.capacity.NodeCapacity(node.Name)
func (s *scheduler) nodeHasEnoughCapacity(required int64, nodeName string) (bool, []string, error) {
available, err := s.capacity.NodeCapacity(nodeName)
if err != nil {
return false, nil, fmt.Errorf("retrieve capacity: %v", err)
}
Expand All @@ -279,11 +293,20 @@ func (s *scheduler) nodeHasEnoughCapacity(required int64, node v1.Node) (bool, [
return true, nil, nil
}

func nodeNames(nodes []v1.Node) []string {
func listNodeNames(nodes []v1.Node) []string {
var names []string
for _, node := range nodes {
names = append(names, node.Name)
}
sort.Strings(names)
return names
}

func getNode(nodes []v1.Node, nodeName string) v1.Node {
for _, node := range nodes {
if node.Name == nodeName {
return node
}
}
return v1.Node{}
}
35 changes: 30 additions & 5 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"sort"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -324,6 +325,8 @@ func TestScheduler(t *testing.T) {
capacity clusterCapacity
// Nodes to check.
nodes []string
// Whether we pass v1.NodeList (false) or slice of node names (true).
nodeCacheCapable bool

// Results
expectedError string
Expand Down Expand Up @@ -460,6 +463,18 @@ func TestScheduler(t *testing.T) {
},
expectedNodes: []string{nodeA, nodeB},
},
"nodeCacheCapable": {
pvcs: []*v1.PersistentVolumeClaim{
unboundPVC,
},
nodes: []string{nodeA, nodeB},
capacity: clusterCapacity{
nodeA: GiG,
nodeB: GiG,
},
nodeCacheCapable: true,
expectedNodes: []string{nodeA, nodeB},
},
"one volume, two nodes, enough capacity on A": {
pvcs: []*v1.PersistentVolumeClaim{
unboundPVC,
Expand Down Expand Up @@ -514,10 +529,13 @@ func TestScheduler(t *testing.T) {
if pod == nil {
pod = makePod(scenario.pvcs, scenario.inline)
}
nodes := makeNodeList(scenario.nodes)
args := schedulerapi.ExtenderArgs{
Pod: pod,
Nodes: nodes,
Pod: pod,
}
if scenario.nodeCacheCapable {
args.NodeNames = &scenario.nodes
} else {
args.Nodes = makeNodeList(scenario.nodes)
}
requestBody, err := json.Marshal(args)
require.NoError(t, err, "marshal request")
Expand All @@ -535,9 +553,16 @@ func TestScheduler(t *testing.T) {
require.NoError(t, err, "unmarshal response")
assert.Equal(t, scenario.expectedError, result.Error)
var names []string
if result.Nodes != nil {
names = nodeNames(result.Nodes.Items)
if scenario.nodeCacheCapable {
if result.NodeNames != nil {
names = *result.NodeNames
}
} else {
if result.Nodes != nil {
names = listNodeNames(result.Nodes.Items)
}
}
sort.Strings(names)
assert.Equal(t, scenario.expectedNodes, names)
failures := scenario.expectedFailures
if failures == nil && scenario.expectedError == "" {
Expand Down
3 changes: 2 additions & 1 deletion test/setup-kubernetes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ EOF
"urlPrefix": "https://127.0.0.1:${TEST_SCHEDULER_EXTENDER_NODE_PORT}",
"filterVerb": "filter",
"prioritizeVerb": "prioritize",
"nodeCacheCapable": false,
"nodeCacheCapable": true,
"weight": 1,
"managedResources":
[{
Expand All @@ -134,6 +134,7 @@ extenders:
- urlPrefix: https://127.0.0.1:${TEST_SCHEDULER_EXTENDER_NODE_PORT}
filterVerb: filter
prioritizeVerb: prioritize
nodeCacheCapable: true
weight: 1
managedResources:
- name: pmem-csi.intel.com/scheduler
Expand Down

0 comments on commit c68423d

Please sign in to comment.