Skip to content

Commit

Permalink
Merge pull request #4276 from tstromberg/proxy-restart2
Browse files Browse the repository at this point in the history
Restart kube-proxy using kubeadm & add bootstrapper.WaitCluster
  • Loading branch information
tstromberg authored May 17, 2019
2 parents 146499e + 2ea63fb commit af049bd
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 171 deletions.
35 changes: 3 additions & 32 deletions cmd/minikube/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/blang/semver"
"github.com/docker/machine/libmachine"
"github.com/docker/machine/libmachine/host"
"github.com/docker/machine/libmachine/state"
"github.com/golang/glog"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
Expand Down Expand Up @@ -244,9 +243,6 @@ func runStart(cmd *cobra.Command, args []string) {
// The kube config must be update must come before bootstrapping, otherwise health checks may use a stale IP
kubeconfig := updateKubeConfig(host, &config)
bootstrapCluster(bs, cr, runner, config.KubernetesConfig, preexisting, isUpgrade)

apiserverPort := config.KubernetesConfig.NodePort
validateCluster(bs, cr, runner, ip, apiserverPort)
configureMounts()
if err = LoadCachedImagesInConfigFile(); err != nil {
console.Failure("Unable to load cached images from config file.")
Expand All @@ -257,6 +253,9 @@ func runStart(cmd *cobra.Command, args []string) {
prepareNone()
}

if err := bs.WaitCluster(config.KubernetesConfig); err != nil {
exit.WithError("Wait failed", err)
}
showKubectlConnectInfo(kubeconfig)

}
Expand Down Expand Up @@ -668,34 +667,6 @@ func bootstrapCluster(bs bootstrapper.Bootstrapper, r cruntime.Manager, runner b
}
}

// validateCluster validates that the cluster is well-configured and healthy
func validateCluster(bs bootstrapper.Bootstrapper, r cruntime.Manager, runner bootstrapper.CommandRunner, ip string, apiserverPort int) {
k8sStat := func() (err error) {
st, err := bs.GetKubeletStatus()
if err != nil || st != state.Running.String() {
return &pkgutil.RetriableError{Err: fmt.Errorf("kubelet unhealthy: %v: %s", err, st)}
}
return nil
}
err := pkgutil.RetryAfter(20, k8sStat, 3*time.Second)
if err != nil {
exit.WithLogEntries("kubelet checks failed", err, logs.FindProblems(r, bs, runner))
}
aStat := func() (err error) {
st, err := bs.GetAPIServerStatus(net.ParseIP(ip), apiserverPort)
if err != nil || st != state.Running.String() {
return &pkgutil.RetriableError{Err: fmt.Errorf("apiserver status=%s err=%v", st, err)}
}
return nil
}

err = pkgutil.RetryAfter(30, aStat, 10*time.Second)
if err != nil {
exit.WithLogEntries("apiserver checks failed", err, logs.FindProblems(r, bs, runner))
}
console.OutLn("")
}

// configureMounts configures any requested filesystem mounts
func configureMounts() {
if !viper.GetBool(createMount) {
Expand Down
1 change: 1 addition & 0 deletions pkg/minikube/bootstrapper/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Bootstrapper interface {
UpdateCluster(config.KubernetesConfig) error
RestartCluster(config.KubernetesConfig) error
DeleteCluster(config.KubernetesConfig) error
WaitCluster(config.KubernetesConfig) error
// LogCommands returns a map of log type to a command which will display that log.
LogCommands(LogOptions) map[string]string
SetupCerts(cfg config.KubernetesConfig) error
Expand Down
79 changes: 40 additions & 39 deletions pkg/minikube/bootstrapper/kubeadm/kubeadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/bootstrapper"
"k8s.io/minikube/pkg/minikube/config"
Expand Down Expand Up @@ -70,7 +71,6 @@ type pod struct {

// PodsByLayer are queries we run when health checking, sorted roughly by dependency layer
var PodsByLayer = []pod{
{"apiserver", "component", "kube-apiserver"},
{"proxy", "k8s-app", "kube-proxy"},
{"etcd", "component", "etcd"},
{"scheduler", "component", "kube-scheduler"},
Expand Down Expand Up @@ -214,20 +214,10 @@ func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
}
}

if err := waitForPods(k8s, false); err != nil {
return errors.Wrap(err, "wait")
}

glog.Infof("Configuring cluster permissions ...")
if err := util.RetryAfter(100, elevateKubeSystemPrivileges, time.Millisecond*500); err != nil {
return errors.Wrap(err, "timed out waiting to elevate kube-system RBAC privileges")
}

// Make sure elevating privileges didn't screw anything up
if err := waitForPods(k8s, true); err != nil {
return errors.Wrap(err, "wait")
}

return nil
}

Expand Down Expand Up @@ -260,37 +250,37 @@ func addAddons(files *[]assets.CopyableFile, data interface{}) error {
return nil
}

// waitForPods waits until the important Kubernetes pods are in running state
func waitForPods(k8s config.KubernetesConfig, quiet bool) error {
// WaitCluster blocks until Kubernetes appears to be healthy.
func (k *Bootstrapper) WaitCluster(k8s config.KubernetesConfig) error {
// Do not wait for "k8s-app" pods in the case of CNI, as they are managed
// by a CNI plugin which is usually started after minikube has been brought
// up. Otherwise, minikube won't start, as "k8s-app" pods are not ready.
componentsOnly := k8s.NetworkPlugin == "cni"

if !quiet {
console.OutStyle("waiting-pods", "Waiting for:")
}
console.OutStyle("waiting-pods", "Verifying:")
client, err := util.GetClient()
if err != nil {
return errors.Wrap(err, "k8s client")
}

// Wait until the apiserver can answer queries properly. We don't care if the apiserver
// pod shows up as registered, but need the webserver for all subsequent queries.
console.Out(" apiserver")
if err := k.waitForAPIServer(k8s); err != nil {
return errors.Wrap(err, "waiting for apiserver")
}

for _, p := range PodsByLayer {
if componentsOnly && p.key != "component" {
continue
}

if !quiet {
console.Out(" %s", p.name)
}
console.Out(" %s", p.name)
selector := labels.SelectorFromSet(labels.Set(map[string]string{p.key: p.value}))
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
return errors.Wrap(err, fmt.Sprintf("waiting for %s=%s", p.key, p.value))
}
}
if !quiet {
console.OutLn("")
}
console.OutLn("")
return nil
}

Expand All @@ -308,11 +298,13 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
controlPlane = "control-plane"
}

configPath := constants.KubeadmConfigFile
baseCmd := fmt.Sprintf("sudo kubeadm %s", phase)
cmds := []string{
fmt.Sprintf("sudo kubeadm %s phase certs all --config %s", phase, constants.KubeadmConfigFile),
fmt.Sprintf("sudo kubeadm %s phase kubeconfig all --config %s", phase, constants.KubeadmConfigFile),
fmt.Sprintf("sudo kubeadm %s phase %s all --config %s", phase, controlPlane, constants.KubeadmConfigFile),
fmt.Sprintf("sudo kubeadm %s phase etcd local --config %s", phase, constants.KubeadmConfigFile),
fmt.Sprintf("%s phase certs all --config %s", baseCmd, configPath),
fmt.Sprintf("%s phase kubeconfig all --config %s", baseCmd, configPath),
fmt.Sprintf("%s phase %s all --config %s", baseCmd, controlPlane, configPath),
fmt.Sprintf("%s phase etcd local --config %s", baseCmd, configPath),
}

// Run commands one at a time so that it is easier to root cause failures.
Expand All @@ -322,23 +314,32 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
}
}

if err := waitForPods(k8s, false); err != nil {
return errors.Wrap(err, "wait")
if err := k.waitForAPIServer(k8s); err != nil {
return errors.Wrap(err, "waiting for apiserver")
}

console.OutStyle("reconfiguring", "Updating kube-proxy configuration ...")
if err = util.RetryAfter(5, func() error { return updateKubeProxyConfigMap(k8s) }, 5*time.Second); err != nil {
return errors.Wrap(err, "restarting kube-proxy")
// restart the proxy and coredns
if err := k.c.Run(fmt.Sprintf("%s phase addon all --config %s", baseCmd, configPath)); err != nil {
return errors.Wrapf(err, "addon phase")
}

// Make sure the kube-proxy restart didn't screw anything up.
if err := waitForPods(k8s, true); err != nil {
return errors.Wrap(err, "wait")
}

return nil
}

// waitForAPIServer waits for the apiserver to start up
func (k *Bootstrapper) waitForAPIServer(k8s config.KubernetesConfig) error {
glog.Infof("Waiting for apiserver ...")
return wait.PollImmediate(time.Millisecond*200, time.Minute*1, func() (bool, error) {
status, err := k.GetAPIServerStatus(net.ParseIP(k8s.NodeIP), k8s.NodePort)
glog.Infof("status: %s, err: %v", status, err)
if err != nil {
return false, err
}
if status != "Running" {
return false, nil
}
return true, nil
})
}

// DeleteCluster removes the components that were started earlier
func (k *Bootstrapper) DeleteCluster(k8s config.KubernetesConfig) error {
cmd := fmt.Sprintf("sudo kubeadm reset --force")
Expand Down
100 changes: 0 additions & 100 deletions pkg/minikube/bootstrapper/kubeadm/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@ limitations under the License.
package kubeadm

import (
"bytes"
"encoding/json"
"html/template"
"net"
"strings"

"github.com/golang/glog"
"github.com/pkg/errors"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1beta1"
apierr "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/service"
"k8s.io/minikube/pkg/util"
Expand Down Expand Up @@ -130,98 +125,3 @@ func elevateKubeSystemPrivileges() error {
}
return nil
}

const (
kubeconfigConf = "kubeconfig.conf"
kubeProxyConfigmapTmpl = `apiVersion: v1
kind: Config
clusters:
- cluster:
certificate-authority: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
server: https://{{.AdvertiseAddress}}:{{.APIServerPort}}
name: default
contexts:
- context:
cluster: default
namespace: default
user: default
name: default
current-context: default
users:
- name: default
user:
tokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
`
)

// updateKubeProxyConfigMap updates the IP & port kube-proxy listens on, and restarts it.
func updateKubeProxyConfigMap(k8s config.KubernetesConfig) error {
client, err := util.GetClient()
if err != nil {
return errors.Wrap(err, "getting k8s client")
}

selector := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "kube-proxy"}))
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
return errors.Wrap(err, "kube-proxy not running")
}

cfgMap, err := client.CoreV1().ConfigMaps("kube-system").Get("kube-proxy", meta.GetOptions{})
if err != nil {
return &util.RetriableError{Err: errors.Wrap(err, "getting kube-proxy configmap")}
}
glog.Infof("kube-proxy config: %v", cfgMap.Data[kubeconfigConf])
t := template.Must(template.New("kubeProxyTmpl").Parse(kubeProxyConfigmapTmpl))
opts := struct {
AdvertiseAddress string
APIServerPort int
}{
AdvertiseAddress: k8s.NodeIP,
APIServerPort: k8s.NodePort,
}

kubeconfig := bytes.Buffer{}
if err := t.Execute(&kubeconfig, opts); err != nil {
return errors.Wrap(err, "executing kube proxy configmap template")
}

if cfgMap.Data == nil {
cfgMap.Data = map[string]string{}
}

updated := strings.TrimSuffix(kubeconfig.String(), "\n")
glog.Infof("updated kube-proxy config: %s", updated)

// An optimization, but also one that's unlikely, as kubeadm writes the address as 'localhost'
if cfgMap.Data[kubeconfigConf] == updated {
glog.Infof("kube-proxy config appears to require no change, not restarting kube-proxy")
return nil
}
cfgMap.Data[kubeconfigConf] = updated

// Make this step retriable, as it can fail with:
// "Operation cannot be fulfilled on configmaps "kube-proxy": the object has been modified; please apply your changes to the latest version and try again"
if _, err := client.CoreV1().ConfigMaps("kube-system").Update(cfgMap); err != nil {
return &util.RetriableError{Err: errors.Wrap(err, "updating configmap")}
}

pods, err := client.CoreV1().Pods("kube-system").List(meta.ListOptions{
LabelSelector: "k8s-app=kube-proxy",
})
if err != nil {
return errors.Wrap(err, "listing kube-proxy pods")
}
for _, pod := range pods.Items {
// Retriable, as known to fail with: pods "<name>" not found
if err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &meta.DeleteOptions{}); err != nil {
return &util.RetriableError{Err: errors.Wrapf(err, "deleting pod %+v", pod)}
}
}

// Wait for the scheduler to restart kube-proxy
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
return errors.Wrap(err, "kube-proxy not running")
}

return nil
}

0 comments on commit af049bd

Please sign in to comment.