diff --git a/controllers/sriovoperatorconfig_controller.go b/controllers/sriovoperatorconfig_controller.go index c47f29ef4..92e10075c 100644 --- a/controllers/sriovoperatorconfig_controller.go +++ b/controllers/sriovoperatorconfig_controller.go @@ -78,6 +78,11 @@ func (r *SriovOperatorConfigReconciler) Reconcile(ctx context.Context, req ctrl. Name: constants.DEFAULT_CONFIG_NAME, Namespace: namespace}, defaultConfig) if err != nil { if errors.IsNotFound(err) { + singleNode, err := utils.IsSingleNodeCluster(r.Client) + if err != nil { + return reconcile.Result{}, fmt.Errorf("Couldn't check the anount of nodes in the cluster") + } + // Default Config object not found, create it. defaultConfig.SetNamespace(namespace) defaultConfig.SetName(constants.DEFAULT_CONFIG_NAME) @@ -86,7 +91,9 @@ func (r *SriovOperatorConfigReconciler) Reconcile(ctx context.Context, req ctrl. EnableOperatorWebhook: func() *bool { b := enableAdmissionController; return &b }(), ConfigDaemonNodeSelector: map[string]string{}, LogLevel: 2, + DisableDrain: singleNode, } + err = r.Create(context.TODO(), defaultConfig) if err != nil { logger.Error(err, "Failed to create default Operator Config", "Namespace", diff --git a/main.go b/main.go index 63a84bac1..3050cf95e 100644 --- a/main.go +++ b/main.go @@ -232,6 +232,12 @@ func createDefaultOperatorConfig(cfg *rest.Config) error { if err != nil { return fmt.Errorf("Couldn't create client: %v", err) } + + singleNode, err := utils.IsSingleNodeCluster(c) + if err != nil { + return fmt.Errorf("Couldn't check the anount of nodes in the cluster") + } + enableAdmissionController := os.Getenv("ENABLE_ADMISSION_CONTROLLER") == "true" config := &sriovnetworkv1.SriovOperatorConfig{ Spec: sriovnetworkv1.SriovOperatorConfigSpec{ @@ -239,6 +245,7 @@ func createDefaultOperatorConfig(cfg *rest.Config) error { EnableOperatorWebhook: func() *bool { b := enableAdmissionController; return &b }(), ConfigDaemonNodeSelector: map[string]string{}, LogLevel: 2, + DisableDrain: singleNode, }, } name := "default" diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 6a9f5c1aa..b3efc9871 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -497,9 +497,26 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error { } } - if reqDrain && !dn.disableDrain { + if reqDrain { + if !dn.disableDrain { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + glog.Infof("nodeStateSyncHandler(): get drain lock for sriov daemon") + done := make(chan bool) + go dn.getDrainLock(ctx, done) + <-done + } + + if utils.ClusterType == utils.ClusterTypeOpenshift { + glog.Infof("nodeStateSyncHandler(): pause MCP") + if err := dn.pauseMCP(); err != nil { + return err + } + } + glog.Info("nodeStateSyncHandler(): drain node") - if err := dn.drainNode(dn.name); err != nil { + if err := dn.drainNode(); err != nil { return err } } @@ -551,8 +568,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error { } func (dn *Daemon) completeDrain() error { - if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil { - return err + if !dn.disableDrain { + if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil { + return err + } } if utils.ClusterType == utils.ClusterTypeOpenshift { @@ -732,7 +751,7 @@ func (dn *Daemon) annotateNode(node, value string) error { func (dn *Daemon) getNodeMachinePool() error { desiredConfig, ok := dn.node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] if !ok { - glog.Error("getNodeMachinePool(): Failed to find the the desiredConfig Annotation") + glog.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation") return fmt.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation") } mc, err := dn.mcClient.MachineconfigurationV1().MachineConfigs().Get(context.TODO(), desiredConfig, metav1.GetOptions{}) @@ -801,99 +820,104 @@ func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) { }) } -func (dn *Daemon) drainNode(name string) error { - glog.Info("drainNode(): Update prepared") +func (dn *Daemon) pauseMCP() error { + glog.Info("pauseMCP(): pausing MCP") var err error + + mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient, + time.Second*30, + ) + mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer() + ctx, cancel := context.WithCancel(context.TODO()) defer cancel() + paused := dn.node.Annotations[annoKey] == annoMcpPaused - done := make(chan bool) - go dn.getDrainLock(ctx, done) - <-done - - if utils.ClusterType == utils.ClusterTypeOpenshift { - mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient, - time.Second*30, - ) - mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer() - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - paused := dn.node.Annotations[annoKey] == annoMcpPaused - - mcpEventHandler := func(obj interface{}) { - mcp := obj.(*mcfgv1.MachineConfigPool) - if mcp.GetName() != dn.mcpName { + mcpEventHandler := func(obj interface{}) { + mcp := obj.(*mcfgv1.MachineConfigPool) + if mcp.GetName() != dn.mcpName { + return + } + // Always get the latest object + newMcp := &mcfgv1.MachineConfigPool{} + newMcp, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{}) + if err != nil { + glog.V(2).Infof("pauseMCP(): Failed to get MCP %s: %v", dn.mcpName, err) + return + } + if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) && + mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) && + mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) { + glog.V(2).Infof("pauseMCP(): MCP %s is ready", dn.mcpName) + if paused { + glog.V(2).Info("pauseMCP(): stop MCP informer") + cancel() + return + } + if newMcp.Spec.Paused { + glog.V(2).Infof("pauseMCP(): MCP %s was paused by other, wait...", dn.mcpName) return } - // Always get the latest object - newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{}) + glog.Infof("pauseMCP(): pause MCP %s", dn.mcpName) + pausePatch := []byte("{\"spec\":{\"paused\":true}}") + _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) if err != nil { - glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err) + glog.V(2).Infof("pauseMCP(): Failed to pause MCP %s: %v", dn.mcpName, err) return } - if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) && - mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) && - mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) { - glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName) - if paused { - glog.V(2).Info("drainNode(): stop MCP informer") - cancel() - return - } - if newMcp.Spec.Paused { - glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName) - return - } - glog.Infof("drainNode(): pause MCP %s", dn.mcpName) - pausePatch := []byte("{\"spec\":{\"paused\":true}}") - _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) - if err != nil { - glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err) - return - } - err = dn.annotateNode(dn.name, annoMcpPaused) - if err != nil { - glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err) - return - } - paused = true + err = dn.annotateNode(dn.name, annoMcpPaused) + if err != nil { + glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err) return } - if paused { - glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName) - pausePatch := []byte("{\"spec\":{\"paused\":false}}") - _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) - if err != nil { - glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err) - return - } - err = dn.annotateNode(dn.name, annoDraining) - if err != nil { - glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err) - return - } - paused = false + paused = true + return + } + if paused { + glog.Infof("pauseMCP(): MCP is processing, resume MCP %s", dn.mcpName) + pausePatch := []byte("{\"spec\":{\"paused\":false}}") + _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) + if err != nil { + glog.V(2).Infof("pauseMCP(): fail to resume MCP %s: %v", dn.mcpName, err) + return } - glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions) + err = dn.annotateNode(dn.name, annoDraining) + if err != nil { + glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err) + return + } + paused = false } + glog.Infof("pauseMCP():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions) + } - mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: mcpEventHandler, - UpdateFunc: func(old, new interface{}) { - mcpEventHandler(new) - }, - }) - - // The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round. - // Only check MCP state if the node is not in Draining_MCP_Paused state - if !paused { - mcpInformerFactory.Start(ctx.Done()) - mcpInformerFactory.WaitForCacheSync(ctx.Done()) - <-ctx.Done() - } + mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: mcpEventHandler, + UpdateFunc: func(old, new interface{}) { + mcpEventHandler(new) + }, + }) + + // The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round. + // Only check MCP state if the node is not in Draining_MCP_Paused state + if !paused { + mcpInformerFactory.Start(ctx.Done()) + mcpInformerFactory.WaitForCacheSync(ctx.Done()) + <-ctx.Done() } + return err +} + +func (dn *Daemon) drainNode() error { + if dn.disableDrain { + glog.Info("drainNode(): disable drain is true skipping drain") + return nil + } + + glog.Info("drainNode(): Update prepared") + var err error + backoff := wait.Backoff{ Steps: 5, Duration: 10 * time.Second, diff --git a/pkg/utils/cluster.go b/pkg/utils/cluster.go new file mode 100644 index 000000000..1421e1e62 --- /dev/null +++ b/pkg/utils/cluster.go @@ -0,0 +1,26 @@ +package utils + +import ( + "context" + + "github.com/golang/glog" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func IsSingleNodeCluster(c client.Client) (bool, error) { + nodeList := &corev1.NodeList{} + err := c.List(context.TODO(), nodeList) + if err != nil { + glog.Errorf("IsSingleNodeCluster(): Failed to list nodes: %v", err) + return false, err + } + + if len(nodeList.Items) == 1 { + glog.Infof("IsSingleNodeCluster(): one node found in the cluster") + return true, nil + } + + return false, nil +}