Skip to content

Commit

Permalink
Merge pull request #213 from SchSeba/support_pause_on_SNO
Browse files Browse the repository at this point in the history
Better support for openshift single node
  • Loading branch information
adrianchiris authored Dec 22, 2021
2 parents 3ba64b7 + 0e08eb7 commit de3966d
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 82 deletions.
7 changes: 7 additions & 0 deletions controllers/sriovoperatorconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (r *SriovOperatorConfigReconciler) Reconcile(ctx context.Context, req ctrl.
Name: constants.DEFAULT_CONFIG_NAME, Namespace: namespace}, defaultConfig)
if err != nil {
if errors.IsNotFound(err) {
singleNode, err := utils.IsSingleNodeCluster(r.Client)
if err != nil {
return reconcile.Result{}, fmt.Errorf("Couldn't check the anount of nodes in the cluster")
}

// Default Config object not found, create it.
defaultConfig.SetNamespace(namespace)
defaultConfig.SetName(constants.DEFAULT_CONFIG_NAME)
Expand All @@ -86,7 +91,9 @@ func (r *SriovOperatorConfigReconciler) Reconcile(ctx context.Context, req ctrl.
EnableOperatorWebhook: func() *bool { b := enableAdmissionController; return &b }(),
ConfigDaemonNodeSelector: map[string]string{},
LogLevel: 2,
DisableDrain: singleNode,
}

err = r.Create(context.TODO(), defaultConfig)
if err != nil {
logger.Error(err, "Failed to create default Operator Config", "Namespace",
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,20 @@ func createDefaultOperatorConfig(cfg *rest.Config) error {
if err != nil {
return fmt.Errorf("Couldn't create client: %v", err)
}

singleNode, err := utils.IsSingleNodeCluster(c)
if err != nil {
return fmt.Errorf("Couldn't check the anount of nodes in the cluster")
}

enableAdmissionController := os.Getenv("ENABLE_ADMISSION_CONTROLLER") == "true"
config := &sriovnetworkv1.SriovOperatorConfig{
Spec: sriovnetworkv1.SriovOperatorConfigSpec{
EnableInjector: func() *bool { b := enableAdmissionController; return &b }(),
EnableOperatorWebhook: func() *bool { b := enableAdmissionController; return &b }(),
ConfigDaemonNodeSelector: map[string]string{},
LogLevel: 2,
DisableDrain: singleNode,
},
}
name := "default"
Expand Down
188 changes: 106 additions & 82 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,26 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
}

if reqDrain && !dn.disableDrain {
if reqDrain {
if !dn.disableDrain {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

glog.Infof("nodeStateSyncHandler(): get drain lock for sriov daemon")
done := make(chan bool)
go dn.getDrainLock(ctx, done)
<-done
}

if utils.ClusterType == utils.ClusterTypeOpenshift {
glog.Infof("nodeStateSyncHandler(): pause MCP")
if err := dn.pauseMCP(); err != nil {
return err
}
}

glog.Info("nodeStateSyncHandler(): drain node")
if err := dn.drainNode(dn.name); err != nil {
if err := dn.drainNode(); err != nil {
return err
}
}
Expand Down Expand Up @@ -551,8 +568,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}

func (dn *Daemon) completeDrain() error {
if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil {
return err
if !dn.disableDrain {
if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil {
return err
}
}

if utils.ClusterType == utils.ClusterTypeOpenshift {
Expand Down Expand Up @@ -732,7 +751,7 @@ func (dn *Daemon) annotateNode(node, value string) error {
func (dn *Daemon) getNodeMachinePool() error {
desiredConfig, ok := dn.node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey]
if !ok {
glog.Error("getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
glog.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
return fmt.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
}
mc, err := dn.mcClient.MachineconfigurationV1().MachineConfigs().Get(context.TODO(), desiredConfig, metav1.GetOptions{})
Expand Down Expand Up @@ -801,99 +820,104 @@ func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) {
})
}

func (dn *Daemon) drainNode(name string) error {
glog.Info("drainNode(): Update prepared")
func (dn *Daemon) pauseMCP() error {
glog.Info("pauseMCP(): pausing MCP")
var err error

mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient,
time.Second*30,
)
mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused

done := make(chan bool)
go dn.getDrainLock(ctx, done)
<-done

if utils.ClusterType == utils.ClusterTypeOpenshift {
mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient,
time.Second*30,
)
mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused

mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
if mcp.GetName() != dn.mcpName {
mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
if mcp.GetName() != dn.mcpName {
return
}
// Always get the latest object
newMcp := &mcfgv1.MachineConfigPool{}
newMcp, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to get MCP %s: %v", dn.mcpName, err)
return
}
if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&
mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
glog.V(2).Infof("pauseMCP(): MCP %s is ready", dn.mcpName)
if paused {
glog.V(2).Info("pauseMCP(): stop MCP informer")
cancel()
return
}
if newMcp.Spec.Paused {
glog.V(2).Infof("pauseMCP(): MCP %s was paused by other, wait...", dn.mcpName)
return
}
// Always get the latest object
newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
glog.Infof("pauseMCP(): pause MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":true}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err)
glog.V(2).Infof("pauseMCP(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&
mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName)
if paused {
glog.V(2).Info("drainNode(): stop MCP informer")
cancel()
return
}
if newMcp.Spec.Paused {
glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName)
return
}
glog.Infof("drainNode(): pause MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":true}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoMcpPaused)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = true
err = dn.annotateNode(dn.name, annoMcpPaused)
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err)
return
}
if paused {
glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = false
paused = true
return
}
if paused {
glog.Infof("pauseMCP(): MCP is processing, resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("pauseMCP(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions)
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err)
return
}
paused = false
}
glog.Infof("pauseMCP():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions)
}

mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: mcpEventHandler,
UpdateFunc: func(old, new interface{}) {
mcpEventHandler(new)
},
})

// The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round.
// Only check MCP state if the node is not in Draining_MCP_Paused state
if !paused {
mcpInformerFactory.Start(ctx.Done())
mcpInformerFactory.WaitForCacheSync(ctx.Done())
<-ctx.Done()
}
mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: mcpEventHandler,
UpdateFunc: func(old, new interface{}) {
mcpEventHandler(new)
},
})

// The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round.
// Only check MCP state if the node is not in Draining_MCP_Paused state
if !paused {
mcpInformerFactory.Start(ctx.Done())
mcpInformerFactory.WaitForCacheSync(ctx.Done())
<-ctx.Done()
}

return err
}

func (dn *Daemon) drainNode() error {
if dn.disableDrain {
glog.Info("drainNode(): disable drain is true skipping drain")
return nil
}

glog.Info("drainNode(): Update prepared")
var err error

backoff := wait.Backoff{
Steps: 5,
Duration: 10 * time.Second,
Expand Down
26 changes: 26 additions & 0 deletions pkg/utils/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package utils

import (
"context"

"github.com/golang/glog"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func IsSingleNodeCluster(c client.Client) (bool, error) {
nodeList := &corev1.NodeList{}
err := c.List(context.TODO(), nodeList)
if err != nil {
glog.Errorf("IsSingleNodeCluster(): Failed to list nodes: %v", err)
return false, err
}

if len(nodeList.Items) == 1 {
glog.Infof("IsSingleNodeCluster(): one node found in the cluster")
return true, nil
}

return false, nil
}

0 comments on commit de3966d

Please sign in to comment.