Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better support for openshift single node #213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions controllers/sriovoperatorconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (r *SriovOperatorConfigReconciler) Reconcile(ctx context.Context, req ctrl.
Name: constants.DEFAULT_CONFIG_NAME, Namespace: namespace}, defaultConfig)
if err != nil {
if errors.IsNotFound(err) {
singleNode, err := utils.IsSingleNodeCluster(r.Client)
if err != nil {
return reconcile.Result{}, fmt.Errorf("Couldn't check the anount of nodes in the cluster")
}

// Default Config object not found, create it.
defaultConfig.SetNamespace(namespace)
defaultConfig.SetName(constants.DEFAULT_CONFIG_NAME)
Expand All @@ -86,7 +91,9 @@ func (r *SriovOperatorConfigReconciler) Reconcile(ctx context.Context, req ctrl.
EnableOperatorWebhook: func() *bool { b := enableAdmissionController; return &b }(),
ConfigDaemonNodeSelector: map[string]string{},
LogLevel: 2,
DisableDrain: singleNode,
}

err = r.Create(context.TODO(), defaultConfig)
if err != nil {
logger.Error(err, "Failed to create default Operator Config", "Namespace",
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,20 @@ func createDefaultOperatorConfig(cfg *rest.Config) error {
if err != nil {
return fmt.Errorf("Couldn't create client: %v", err)
}

singleNode, err := utils.IsSingleNodeCluster(c)
if err != nil {
return fmt.Errorf("Couldn't check the anount of nodes in the cluster")
}

enableAdmissionController := os.Getenv("ENABLE_ADMISSION_CONTROLLER") == "true"
config := &sriovnetworkv1.SriovOperatorConfig{
Spec: sriovnetworkv1.SriovOperatorConfigSpec{
EnableInjector: func() *bool { b := enableAdmissionController; return &b }(),
EnableOperatorWebhook: func() *bool { b := enableAdmissionController; return &b }(),
ConfigDaemonNodeSelector: map[string]string{},
LogLevel: 2,
DisableDrain: singleNode,
},
}
name := "default"
Expand Down
188 changes: 106 additions & 82 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,26 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
}

if reqDrain && !dn.disableDrain {
if reqDrain {
if !dn.disableDrain {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

glog.Infof("nodeStateSyncHandler(): get drain lock for sriov daemon")
done := make(chan bool)
go dn.getDrainLock(ctx, done)
<-done
}

if utils.ClusterType == utils.ClusterTypeOpenshift {
glog.Infof("nodeStateSyncHandler(): pause MCP")
if err := dn.pauseMCP(); err != nil {
return err
}
}

glog.Info("nodeStateSyncHandler(): drain node")
if err := dn.drainNode(dn.name); err != nil {
if err := dn.drainNode(); err != nil {
return err
}
}
Expand Down Expand Up @@ -551,8 +568,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}

func (dn *Daemon) completeDrain() error {
if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil {
return err
if !dn.disableDrain {
if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil {
return err
}
}

if utils.ClusterType == utils.ClusterTypeOpenshift {
Expand Down Expand Up @@ -732,7 +751,7 @@ func (dn *Daemon) annotateNode(node, value string) error {
func (dn *Daemon) getNodeMachinePool() error {
desiredConfig, ok := dn.node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey]
if !ok {
glog.Error("getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
glog.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
return fmt.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
}
mc, err := dn.mcClient.MachineconfigurationV1().MachineConfigs().Get(context.TODO(), desiredConfig, metav1.GetOptions{})
Expand Down Expand Up @@ -801,99 +820,104 @@ func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) {
})
}

func (dn *Daemon) drainNode(name string) error {
glog.Info("drainNode(): Update prepared")
func (dn *Daemon) pauseMCP() error {
glog.Info("pauseMCP(): pausing MCP")
var err error

mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient,
time.Second*30,
)
mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused

done := make(chan bool)
go dn.getDrainLock(ctx, done)
<-done

if utils.ClusterType == utils.ClusterTypeOpenshift {
mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient,
time.Second*30,
)
mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused

mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
if mcp.GetName() != dn.mcpName {
mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
if mcp.GetName() != dn.mcpName {
return
}
// Always get the latest object
newMcp := &mcfgv1.MachineConfigPool{}
newMcp, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to get MCP %s: %v", dn.mcpName, err)
return
}
if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&
mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
glog.V(2).Infof("pauseMCP(): MCP %s is ready", dn.mcpName)
if paused {
glog.V(2).Info("pauseMCP(): stop MCP informer")
cancel()
return
}
if newMcp.Spec.Paused {
glog.V(2).Infof("pauseMCP(): MCP %s was paused by other, wait...", dn.mcpName)
return
}
// Always get the latest object
newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
glog.Infof("pauseMCP(): pause MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":true}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err)
glog.V(2).Infof("pauseMCP(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&
mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName)
if paused {
glog.V(2).Info("drainNode(): stop MCP informer")
cancel()
return
}
if newMcp.Spec.Paused {
glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName)
return
}
glog.Infof("drainNode(): pause MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":true}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoMcpPaused)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = true
err = dn.annotateNode(dn.name, annoMcpPaused)
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err)
return
}
if paused {
glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = false
paused = true
return
}
if paused {
glog.Infof("pauseMCP(): MCP is processing, resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("pauseMCP(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions)
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err)
return
}
paused = false
}
glog.Infof("pauseMCP():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions)
}

mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: mcpEventHandler,
UpdateFunc: func(old, new interface{}) {
mcpEventHandler(new)
},
})

// The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round.
// Only check MCP state if the node is not in Draining_MCP_Paused state
if !paused {
mcpInformerFactory.Start(ctx.Done())
mcpInformerFactory.WaitForCacheSync(ctx.Done())
<-ctx.Done()
}
mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: mcpEventHandler,
UpdateFunc: func(old, new interface{}) {
mcpEventHandler(new)
},
})

// The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round.
// Only check MCP state if the node is not in Draining_MCP_Paused state
if !paused {
mcpInformerFactory.Start(ctx.Done())
mcpInformerFactory.WaitForCacheSync(ctx.Done())
<-ctx.Done()
}

return err
}

func (dn *Daemon) drainNode() error {
if dn.disableDrain {
glog.Info("drainNode(): disable drain is true skipping drain")
return nil
}

glog.Info("drainNode(): Update prepared")
var err error

backoff := wait.Backoff{
Steps: 5,
Duration: 10 * time.Second,
Expand Down
26 changes: 26 additions & 0 deletions pkg/utils/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package utils

import (
"context"

"github.com/golang/glog"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func IsSingleNodeCluster(c client.Client) (bool, error) {
nodeList := &corev1.NodeList{}
err := c.List(context.TODO(), nodeList)
if err != nil {
glog.Errorf("IsSingleNodeCluster(): Failed to list nodes: %v", err)
return false, err
}

if len(nodeList.Items) == 1 {
glog.Infof("IsSingleNodeCluster(): one node found in the cluster")
return true, nil
}

return false, nil
}