diff --git a/bindata/manifests/sriov-config-service/kubernetes/sriov-config-post-network-service.yaml b/bindata/manifests/sriov-config-service/kubernetes/sriov-config-post-network-service.yaml index 08c8bce76..1b147cef9 100644 --- a/bindata/manifests/sriov-config-service/kubernetes/sriov-config-post-network-service.yaml +++ b/bindata/manifests/sriov-config-service/kubernetes/sriov-config-post-network-service.yaml @@ -6,7 +6,7 @@ contents: | [Service] Type=oneshot - ExecStart=/var/lib/sriov/sriov-network-config-daemon -v 2 --zap-log-level 2 service --phase post + ExecStart=/var/lib/sriov/sriov-network-config-daemon service --phase post StandardOutput=journal+console [Install] diff --git a/bindata/manifests/sriov-config-service/kubernetes/sriov-config-service.yaml b/bindata/manifests/sriov-config-service/kubernetes/sriov-config-service.yaml index f75062dab..dce2d489b 100644 --- a/bindata/manifests/sriov-config-service/kubernetes/sriov-config-service.yaml +++ b/bindata/manifests/sriov-config-service/kubernetes/sriov-config-service.yaml @@ -7,7 +7,7 @@ contents: | [Service] Type=oneshot - ExecStart=/var/lib/sriov/sriov-network-config-daemon -v 2 --zap-log-level 2 service --phase pre + ExecStart=/var/lib/sriov/sriov-network-config-daemon service --phase pre StandardOutput=journal+console [Install] diff --git a/bindata/manifests/sriov-config-service/openshift/sriov-config-service.yaml b/bindata/manifests/sriov-config-service/openshift/sriov-config-service.yaml index 706679430..4d13c751c 100644 --- a/bindata/manifests/sriov-config-service/openshift/sriov-config-service.yaml +++ b/bindata/manifests/sriov-config-service/openshift/sriov-config-service.yaml @@ -21,7 +21,7 @@ spec: [Service] Type=oneshot - ExecStart=/var/lib/sriov/sriov-network-config-daemon service -v {{ .LogLevel }} --zap-log-level {{ .LogLevel }} --phase pre + ExecStart=/var/lib/sriov/sriov-network-config-daemon service --phase pre StandardOutput=journal+console [Install] @@ -38,7 +38,7 @@ spec: [Service] Type=oneshot - ExecStart=/var/lib/sriov/sriov-network-config-daemon service -v {{ .LogLevel }} --zap-log-level {{ .LogLevel }} --phase post + ExecStart=/var/lib/sriov/sriov-network-config-daemon service --phase post StandardOutput=journal+console [Install] diff --git a/cmd/sriov-network-config-daemon/service.go b/cmd/sriov-network-config-daemon/service.go index 0209583cc..95cbb1d12 100644 --- a/cmd/sriov-network-config-daemon/service.go +++ b/cmd/sriov-network-config-daemon/service.go @@ -85,6 +85,7 @@ func runServiceCmd(cmd *cobra.Command, args []string) error { } // init logger snolog.InitLog() + snolog.SetLogLevel(2) setupLog := log.Log.WithName("sriov-config-service").WithValues("phase", phaseArg) setupLog.V(0).Info("Starting sriov-config-service", "version", version.Version) @@ -168,7 +169,7 @@ func phasePre(setupLog logr.Logger, conf *systemd.SriovConfig, hostHelpers helpe func phasePost(setupLog logr.Logger, conf *systemd.SriovConfig, hostHelpers helper.HostHelpersInterface) error { setupLog.V(0).Info("check result of the Pre phase") - prePhaseResult, err := systemd.ReadSriovResult() + prePhaseResult, _, err := systemd.ReadSriovResult() if err != nil { return fmt.Errorf("failed to read result of the pre phase: %v", err) } diff --git a/cmd/sriov-network-config-daemon/start.go b/cmd/sriov-network-config-daemon/start.go index b1ad0f667..9b0846d34 100644 --- a/cmd/sriov-network-config-daemon/start.go +++ b/cmd/sriov-network-config-daemon/start.go @@ -18,25 +18,27 @@ package main import ( "context" "fmt" - "net" "net/url" "os" "strings" "time" + ocpconfigapi "github.com/openshift/api/config/v1" "github.com/spf13/cobra" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/connrotation" - "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - - configv1 "github.com/openshift/api/config/v1" - mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" @@ -89,6 +91,8 @@ var ( manageSoftwareBridges bool ovsSocketPath string } + + scheme = runtime.NewScheme() ) func init() { @@ -100,13 +104,17 @@ func init() { startCmd.PersistentFlags().BoolVar(&startOpts.parallelNicConfig, "parallel-nic-config", false, "perform NIC configuration in parallel") startCmd.PersistentFlags().BoolVar(&startOpts.manageSoftwareBridges, "manage-software-bridges", false, "enable management of software bridges") startCmd.PersistentFlags().StringVar(&startOpts.ovsSocketPath, "ovs-socket-path", vars.OVSDBSocketPath, "path for OVSDB socket") -} -func runStartCmd(cmd *cobra.Command, args []string) error { - // init logger + // Init Scheme + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(sriovnetworkv1.AddToScheme(scheme)) + utilruntime.Must(ocpconfigapi.AddToScheme(scheme)) + + // Init logger snolog.InitLog() - setupLog := log.Log.WithName("sriov-network-config-daemon") +} +func configGlobalVariables() error { // Mark that we are running inside a container vars.UsingSystemdMode = false if startOpts.systemd { @@ -132,102 +140,112 @@ func runStartCmd(cmd *cobra.Command, args []string) error { } } - // This channel is used to ensure all spawned goroutines exit when we exit. - stopCh := make(chan struct{}) - defer close(stopCh) + vars.Scheme = scheme - // This channel is used to signal Run() something failed and to jump ship. - // It's purely a chan<- in the Daemon struct for goroutines to write to, and - // a <-chan in Run() for the main thread to listen on. - exitCh := make(chan error) - defer close(exitCh) - - // This channel is to make sure main thread will wait until the writer finish - // to report lastSyncError in SriovNetworkNodeState object. - syncCh := make(chan struct{}) - defer close(syncCh) + return nil +} - refreshCh := make(chan daemon.Message) - defer close(refreshCh) +func UseKubeletKubeConfig() { + fnLogger := log.Log.WithName("sriov-network-config-daemon") - var config *rest.Config - var err error - - // On openshift we use the kubeconfig from kubelet on the node where the daemon is running - // this allow us to improve security as every daemon has access only to its own node - if vars.ClusterType == consts.ClusterTypeOpenshift { - kubeconfig, err := clientcmd.LoadFromFile("/host/etc/kubernetes/kubeconfig") - if err != nil { - setupLog.Error(err, "failed to load kubelet kubeconfig") - } - clusterName := kubeconfig.Contexts[kubeconfig.CurrentContext].Cluster - apiURL := kubeconfig.Clusters[clusterName].Server + kubeconfig, err := clientcmd.LoadFromFile("/host/etc/kubernetes/kubeconfig") + if err != nil { + fnLogger.Error(err, "failed to load kubelet kubeconfig") + } + clusterName := kubeconfig.Contexts[kubeconfig.CurrentContext].Cluster + apiURL := kubeconfig.Clusters[clusterName].Server - urlPath, err := url.Parse(apiURL) - if err != nil { - setupLog.Error(err, "failed to parse api url from kubelet kubeconfig") - } + urlPath, err := url.Parse(apiURL) + if err != nil { + fnLogger.Error(err, "failed to parse api url from kubelet kubeconfig") + } - // The kubernetes in-cluster functions don't let you override the apiserver - // directly; gotta "pass" it via environment vars. - setupLog.V(0).Info("overriding kubernetes api", "new-url", apiURL) - err = os.Setenv("KUBERNETES_SERVICE_HOST", urlPath.Hostname()) - if err != nil { - setupLog.Error(err, "failed to set KUBERNETES_SERVICE_HOST environment variable") - } - err = os.Setenv("KUBERNETES_SERVICE_PORT", urlPath.Port()) - if err != nil { - setupLog.Error(err, "failed to set KUBERNETES_SERVICE_PORT environment variable") - } + // The kubernetes in-cluster functions don't let you override the apiserver + // directly; gotta "pass" it via environment vars. + fnLogger.V(0).Info("overriding kubernetes api", "new-url", apiURL) + err = os.Setenv("KUBERNETES_SERVICE_HOST", urlPath.Hostname()) + if err != nil { + fnLogger.Error(err, "failed to set KUBERNETES_SERVICE_HOST environment variable") } + err = os.Setenv("KUBERNETES_SERVICE_PORT", urlPath.Port()) + if err != nil { + fnLogger.Error(err, "failed to set KUBERNETES_SERVICE_PORT environment variable") + } +} - kubeconfig := os.Getenv("KUBECONFIG") - if kubeconfig != "" { - config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) - } else { - // creates the in-cluster config - config, err = rest.InClusterConfig() +func getOperatorConfig(kClient runtimeclient.Client) (*sriovnetworkv1.SriovOperatorConfig, error) { + // Init feature gates once to prevent race conditions. + defaultConfig := &sriovnetworkv1.SriovOperatorConfig{} + err := kClient.Get(context.Background(), types.NamespacedName{Namespace: vars.Namespace, Name: consts.DefaultConfigName}, defaultConfig) + if err != nil { + return nil, err } + return defaultConfig, nil +} +func initFeatureGates(kClient runtimeclient.Client) (featuregate.FeatureGate, error) { + fnLogger := log.Log.WithName("initFeatureGates") + // Init feature gates once to prevent race conditions. + defaultConfig, err := getOperatorConfig(kClient) if err != nil { - return err + fnLogger.Error(err, "Failed to get default SriovOperatorConfig object") + return nil, err } + featureGates := featuregate.New() + featureGates.Init(defaultConfig.Spec.FeatureGates) + fnLogger.Info("Enabled featureGates", "featureGates", featureGates.String()) - vars.Config = config - vars.Scheme = scheme.Scheme + return featureGates, nil +} - closeAllConns, err := updateDialer(config) +func initLogLevel(kClient runtimeclient.Client) error { + fnLogger := log.Log.WithName("initLogLevel") + // Init feature gates once to prevent race conditions. + defaultConfig, err := getOperatorConfig(kClient) if err != nil { + fnLogger.Error(err, "Failed to get default SriovOperatorConfig object") return err } + fnLogger.V(2).Info("DEBUG", defaultConfig) + snolog.SetLogLevel(defaultConfig.Spec.LogLevel) + fnLogger.V(2).Info("logLevel sets", "logLevel", defaultConfig.Spec.LogLevel) + return nil +} - err = sriovnetworkv1.AddToScheme(scheme.Scheme) +func runStartCmd(cmd *cobra.Command, args []string) error { + setupLog := log.Log.WithName("sriov-network-config-daemon") + stopSignalCh := ctrl.SetupSignalHandler() + + // Load global variables + err := configGlobalVariables() if err != nil { - setupLog.Error(err, "failed to load sriov network CRDs to scheme") + setupLog.Error(err, "unable to config global variables") return err } - err = mcfgv1.AddToScheme(scheme.Scheme) - if err != nil { - setupLog.Error(err, "failed to load machine config CRDs to scheme") - return err + var config *rest.Config + + // On openshift we use the kubeconfig from kubelet on the node where the daemon is running + // this allow us to improve security as every daemon has access only to its own node + if vars.ClusterType == consts.ClusterTypeOpenshift { + UseKubeletKubeConfig() } - err = configv1.Install(scheme.Scheme) - if err != nil { - setupLog.Error(err, "failed to load openshift config CRDs to scheme") - return err + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig != "" { + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } else { + // creates the in-cluster config + config, err = rest.InClusterConfig() } - kClient, err := client.New(config, client.Options{Scheme: scheme.Scheme}) if err != nil { - setupLog.Error(err, "couldn't create client") - os.Exit(1) + return err } + vars.Config = config + config.Timeout = 5 * time.Second - snclient := snclientset.NewForConfigOrDie(config) - kubeclient := kubernetes.NewForConfigOrDie(config) - + // create helpers hostHelpers, err := helper.NewDefaultHostHelpers() if err != nil { setupLog.Error(err, "failed to create hostHelpers") @@ -240,88 +258,108 @@ func runStartCmd(cmd *cobra.Command, args []string) error { return err } - config.Timeout = 5 * time.Second - writerclient := snclientset.NewForConfigOrDie(config) + // create clients + snclient := snclientset.NewForConfigOrDie(config) + kubeclient := kubernetes.NewForConfigOrDie(config) + kClient, err := runtimeclient.New( + config, + runtimeclient.Options{ + Scheme: vars.Scheme}) + if err != nil { + setupLog.Error(err, "couldn't create generic client") + os.Exit(1) + } - eventRecorder := daemon.NewEventRecorder(writerclient, kubeclient) + eventRecorder := daemon.NewEventRecorder(snclient, kubeclient, scheme) defer eventRecorder.Shutdown() - setupLog.V(0).Info("starting node writer") - nodeWriter := daemon.NewNodeStateStatusWriter(writerclient, - closeAllConns, - eventRecorder, - hostHelpers, - platformHelper) - - nodeInfo, err := kubeclient.CoreV1().Nodes().Get(context.Background(), startOpts.nodeName, v1.GetOptions{}) - if err == nil { - for key, pType := range vars.PlatformsMap { - if strings.Contains(strings.ToLower(nodeInfo.Spec.ProviderID), strings.ToLower(key)) { - vars.PlatformType = pType - } - } - } else { + nodeInfo, err := kubeclient.CoreV1().Nodes().Get(context.Background(), vars.NodeName, v1.GetOptions{}) + if err != nil { setupLog.Error(err, "failed to fetch node state, exiting", "node-name", startOpts.nodeName) return err } + + // check for platform + for key, pType := range vars.PlatformsMap { + if strings.Contains(strings.ToLower(nodeInfo.Spec.ProviderID), strings.ToLower(key)) { + vars.PlatformType = pType + } + } setupLog.Info("Running on", "platform", vars.PlatformType.String()) + // Initial supported nic IDs if err := sriovnetworkv1.InitNicIDMapFromConfigMap(kubeclient, vars.Namespace); err != nil { setupLog.Error(err, "failed to run init NicIdMap") return err } - eventRecorder.SendEvent("ConfigDaemonStart", "Config Daemon starting") + eventRecorder.SendEvent(stopSignalCh, "ConfigDaemonStart", "Config Daemon starting") - // block the deamon process until nodeWriter finish first its run - err = nodeWriter.RunOnce() + fg, err := initFeatureGates(kClient) if err != nil { - setupLog.Error(err, "failed to run writer") + setupLog.Error(err, "failed to initialize feature gates") return err } - go nodeWriter.Run(stopCh, refreshCh, syncCh) - // Init feature gates once to prevent race conditions. - defaultConfig := &sriovnetworkv1.SriovOperatorConfig{} - err = kClient.Get(context.Background(), types.NamespacedName{Namespace: vars.Namespace, Name: consts.DefaultConfigName}, defaultConfig) + if err := initLogLevel(kClient); err != nil { + setupLog.Error(err, "failed to initialize log level") + return err + } + + // Init manager + setupLog.V(0).Info("Starting SR-IOV Network Config Daemon") + nodeStateSelector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s,metadata.namespace=%s", vars.NodeName, vars.Namespace)) if err != nil { - log.Log.Error(err, "Failed to get default SriovOperatorConfig object") + setupLog.Error(err, "failed to parse sriovNetworkNodeState name selector") + return err + } + operatorConfigSelector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s,metadata.namespace=%s", consts.DefaultConfigName, vars.Namespace)) + if err != nil { + setupLog.Error(err, "failed to parse sriovOperatorConfig name selector") return err } - featureGates := featuregate.New() - featureGates.Init(defaultConfig.Spec.FeatureGates) - vars.MlxPluginFwReset = featureGates.IsEnabled(consts.MellanoxFirmwareResetFeatureGate) - log.Log.Info("Enabled featureGates", "featureGates", featureGates.String()) - setupLog.V(0).Info("Starting SriovNetworkConfigDaemon") - err = daemon.New( + mgr, err := ctrl.NewManager(vars.Config, ctrl.Options{ + Scheme: vars.Scheme, + Metrics: server.Options{BindAddress: "0"}, // disable metrics server for now as the daemon runs with hostNetwork + Cache: cache.Options{ // cache only the SriovNetworkNodeState with the node name + ByObject: map[runtimeclient.Object]cache.ByObject{ + &sriovnetworkv1.SriovNetworkNodeState{}: {Field: nodeStateSelector}, + &sriovnetworkv1.SriovOperatorConfig{}: {Field: operatorConfigSelector}}}, + }) + if err != nil { + setupLog.Error(err, "unable to create manager") + os.Exit(1) + } + + dm := daemon.New( kClient, snclient, kubeclient, hostHelpers, platformHelper, - exitCh, - stopCh, - syncCh, - refreshCh, eventRecorder, - featureGates, - startOpts.disabledPlugins, - ).Run(stopCh, exitCh) - if err != nil { - setupLog.Error(err, "failed to run daemon") + fg, + startOpts.disabledPlugins) + + // Init Daemon configuration on the node + if err = dm.DaemonInitialization(); err != nil { + setupLog.Error(err, "unable to initialize daemon") + os.Exit(1) + } + + // Setup reconcile loop with manager + if err = dm.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create setup daemon manager for SriovNetworkNodeState") + os.Exit(1) } - setupLog.V(0).Info("Shutting down SriovNetworkConfigDaemon") - return err -} -// updateDialer instruments a restconfig with a dial. the returned function allows forcefully closing all active connections. -func updateDialer(clientConfig *rest.Config) (func(), error) { - if clientConfig.Transport != nil || clientConfig.Dial != nil { - return nil, fmt.Errorf("there is already a transport or dialer configured") + // Setup reconcile loop with manager + if err = daemon.NewOperatorConfigReconcile(kClient).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create setup daemon manager for OperatorConfig") + os.Exit(1) } - f := &net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second} - d := connrotation.NewDialer(f.DialContext) - clientConfig.Dial = d.DialContext - return d.CloseAll, nil + + setupLog.Info("Starting Manager") + return mgr.Start(stopSignalCh) } diff --git a/controllers/drain_controller.go b/controllers/drain_controller.go index 5d976a380..796e44dda 100644 --- a/controllers/drain_controller.go +++ b/controllers/drain_controller.go @@ -106,7 +106,12 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } // create the drain state annotation if it doesn't exist in the sriovNetworkNodeState object - nodeStateDrainAnnotationCurrent, nodeStateExist, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent) + nodeStateDrainAnnotationCurrent, currentNodeStateExist, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent) + if err != nil { + reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation") + return ctrl.Result{}, err + } + _, desireNodeStateExist, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotation) if err != nil { reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation") return ctrl.Result{}, err @@ -120,7 +125,7 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } // requeue the request if we needed to add any of the annotations - if !nodeExist || !nodeStateExist { + if !nodeExist || !currentNodeStateExist || !desireNodeStateExist { return ctrl.Result{Requeue: true}, nil } reqLogger.V(2).Info("Drain annotations", "nodeAnnotation", nodeDrainAnnotation, "nodeStateAnnotation", nodeStateDrainAnnotationCurrent) diff --git a/controllers/drain_controller_test.go b/controllers/drain_controller_test.go index de3fe0884..dd1bc075c 100644 --- a/controllers/drain_controller_test.go +++ b/controllers/drain_controller_test.go @@ -428,7 +428,7 @@ func createNodeWithLabel(ctx context.Context, nodeName string, label string) (*c ObjectMeta: metav1.ObjectMeta{ Name: nodeName, Namespace: vars.Namespace, - Labels: map[string]string{ + Annotations: map[string]string{ constants.NodeStateDrainAnnotationCurrent: constants.DrainIdle, }, }, diff --git a/controllers/helper.go b/controllers/helper.go index bf918bd3f..fec7f3d9d 100644 --- a/controllers/helper.go +++ b/controllers/helper.go @@ -108,14 +108,7 @@ type DrainStateAnnotationPredicate struct { } func (DrainStateAnnotationPredicate) Create(e event.CreateEvent) bool { - if e.Object == nil { - return false - } - - if _, hasAnno := e.Object.GetLabels()[constants.NodeStateDrainAnnotationCurrent]; hasAnno { - return true - } - return false + return e.Object != nil } func (DrainStateAnnotationPredicate) Update(e event.UpdateEvent) bool { diff --git a/go.mod b/go.mod index eabcfa6a5..32585d4b4 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,6 @@ require ( github.com/vishvananda/netns v0.0.4 go.uber.org/zap v1.25.0 golang.org/x/net v0.33.0 - golang.org/x/time v0.3.0 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.28.3 @@ -151,6 +150,7 @@ require ( golang.org/x/sys v0.28.0 // indirect golang.org/x/term v0.27.0 // indirect golang.org/x/text v0.21.0 // indirect + golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/hack/run-e2e-conformance.sh b/hack/run-e2e-conformance.sh index 1e8666098..bb76fc896 100755 --- a/hack/run-e2e-conformance.sh +++ b/hack/run-e2e-conformance.sh @@ -7,4 +7,4 @@ GOPATH="${GOPATH:-$HOME/go}" JUNIT_OUTPUT="${JUNIT_OUTPUT:-/tmp/artifacts}" export PATH=$PATH:$GOPATH/bin -${root}/bin/ginkgo -output-dir=$JUNIT_OUTPUT --junit-report "unit_report.xml" -v "$SUITE" -- -report=$JUNIT_OUTPUT +${root}/bin/ginkgo --timeout 3h -output-dir=$JUNIT_OUTPUT --junit-report "unit_report.xml" -v "$SUITE" -- -report=$JUNIT_OUTPUT diff --git a/hack/virtual-cluster-redeploy.sh b/hack/virtual-cluster-redeploy.sh index a16aeeaf4..7bc92af8e 100755 --- a/hack/virtual-cluster-redeploy.sh +++ b/hack/virtual-cluster-redeploy.sh @@ -28,6 +28,21 @@ if [ $CLUSTER_TYPE == "openshift" ]; then registry="default-route-openshift-image-registry.apps.${cluster_name}.${domain_name}" podman login -u serviceaccount -p ${pass} $registry --tls-verify=false + export ADMISSION_CONTROLLERS_ENABLED=true + export SKIP_VAR_SET="" + export NAMESPACE="openshift-sriov-network-operator" + export OPERATOR_NAMESPACE=$NAMESPACE + export MULTUS_NAMESPACE="openshift-multus" + export OPERATOR_EXEC=kubectl + export CLUSTER_TYPE=openshift + export DEV_MODE=TRUE + export CLUSTER_HAS_EMULATED_PF=TRUE + export OPERATOR_LEADER_ELECTION_ENABLE=true + export METRICS_EXPORTER_PROMETHEUS_OPERATOR_ENABLED=true + export METRICS_EXPORTER_PROMETHEUS_DEPLOY_RULES=true + export METRICS_EXPORTER_PROMETHEUS_OPERATOR_SERVICE_ACCOUNT=${METRICS_EXPORTER_PROMETHEUS_OPERATOR_SERVICE_ACCOUNT:-"prometheus-k8s"} + export METRICS_EXPORTER_PROMETHEUS_OPERATOR_NAMESPACE=${METRICS_EXPORTER_PROMETHEUS_OPERATOR_NAMESPACE:-"openshift-monitoring"} + export SRIOV_NETWORK_OPERATOR_IMAGE="$registry/$NAMESPACE/sriov-network-operator:latest" export SRIOV_NETWORK_CONFIG_DAEMON_IMAGE="$registry/$NAMESPACE/sriov-network-config-daemon:latest" export SRIOV_NETWORK_WEBHOOK_IMAGE="$registry/$NAMESPACE/sriov-network-operator-webhook:latest" diff --git a/main.go b/main.go index 7195c59c3..d7bf589ab 100644 --- a/main.go +++ b/main.go @@ -114,7 +114,7 @@ func main() { LeaderElectionID: consts.LeaderElectionID, }) if err != nil { - setupLog.Error(err, "unable to start leader election manager") + setupLog.Error(err, "unable to create leader election manager") os.Exit(1) } @@ -134,7 +134,7 @@ func main() { Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{vars.Namespace: {}}}, }) if err != nil { - setupLog.Error(err, "unable to start manager") + setupLog.Error(err, "unable to create manager") os.Exit(1) } @@ -166,7 +166,6 @@ func main() { err = mgrGlobal.GetCache().IndexField(context.Background(), &sriovnetworkv1.OVSNetwork{}, "spec.networkNamespace", func(o client.Object) []string { return []string{o.(*sriovnetworkv1.OVSNetwork).Spec.NetworkNamespace} }) - if err != nil { setupLog.Error(err, "unable to create index field for cache") os.Exit(1) diff --git a/pkg/daemon/config.go b/pkg/daemon/config.go new file mode 100644 index 000000000..ddeba84db --- /dev/null +++ b/pkg/daemon/config.go @@ -0,0 +1,61 @@ +package daemon + +import ( + "context" + "reflect" + + "k8s.io/apimachinery/pkg/api/errors" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" +) + +type OperatorConfigReconcile struct { + client client.Client + latestFeatureGates map[string]bool +} + +func NewOperatorConfigReconcile(client client.Client) *OperatorConfigReconcile { + return &OperatorConfigReconcile{client: client, latestFeatureGates: make(map[string]bool)} +} + +func (oc *OperatorConfigReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + reqLogger := log.FromContext(ctx).WithName("Reconcile") + operatorConfig := &sriovnetworkv1.SriovOperatorConfig{} + err := oc.client.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, operatorConfig) + if err != nil { + if errors.IsNotFound(err) { + reqLogger.Info("OperatorConfig doesn't exist", "name", req.Name, "namespace", req.Namespace) + return ctrl.Result{}, nil + } + reqLogger.Error(err, "Failed to operatorConfig", "name", req.Name, "namespace", req.Namespace) + return ctrl.Result{}, err + } + + // update log level + snolog.SetLogLevel(operatorConfig.Spec.LogLevel) + + newDisableDrain := operatorConfig.Spec.DisableDrain + if vars.DisableDrain != newDisableDrain { + vars.DisableDrain = newDisableDrain + log.Log.Info("Set Disable Drain", "value", vars.DisableDrain) + } + + if !reflect.DeepEqual(oc.latestFeatureGates, operatorConfig.Spec.FeatureGates) { + vars.FeatureGate.Init(operatorConfig.Spec.FeatureGates) + oc.latestFeatureGates = operatorConfig.Spec.FeatureGates + log.Log.Info("Updated featureGates", "featureGates", vars.FeatureGate.String()) + } + + return ctrl.Result{}, nil +} + +func (oc *OperatorConfigReconcile) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&sriovnetworkv1.SriovOperatorConfig{}). + Complete(oc) +} diff --git a/pkg/daemon/config_test.go b/pkg/daemon/config_test.go new file mode 100644 index 000000000..ad84cd5bf --- /dev/null +++ b/pkg/daemon/config_test.go @@ -0,0 +1,170 @@ +package daemon_test + +import ( + "context" + "sync" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/daemon" + snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" +) + +var _ = Describe("Daemon OperatorConfig Controller", Ordered, func() { + var cancel context.CancelFunc + var ctx context.Context + + BeforeAll(func() { + By("Setup controller manager") + k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{ + Scheme: scheme.Scheme, + }) + Expect(err).ToNot(HaveOccurred()) + + configController := daemon.NewOperatorConfigReconcile(k8sClient) + err = configController.SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + + ctx, cancel = context.WithCancel(context.Background()) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + defer GinkgoRecover() + By("Start controller manager") + err := k8sManager.Start(ctx) + Expect(err).ToNot(HaveOccurred()) + }() + + DeferCleanup(func() { + By("Shutdown controller manager") + cancel() + wg.Wait() + }) + + err = k8sClient.Create(ctx, &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Name: "default", Namespace: "default"}}) + Expect(err).ToNot(HaveOccurred()) + }) + + BeforeEach(func() { + Expect(k8sClient.DeleteAllOf(context.Background(), &sriovnetworkv1.SriovOperatorConfig{}, client.InNamespace(testNamespace))).ToNot(HaveOccurred()) + }) + + Context("LogLevel", func() { + It("should configure the log level base on sriovOperatorConfig", func() { + soc := &sriovnetworkv1.SriovOperatorConfig{ObjectMeta: metav1.ObjectMeta{ + Name: consts.DefaultConfigName, + Namespace: testNamespace, + }, + Spec: sriovnetworkv1.SriovOperatorConfigSpec{ + LogLevel: 1, + }, + } + + err := k8sClient.Create(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + validateExpectedLogLevel(1) + + }) + + It("should update the log level in runtime", func() { + soc := &sriovnetworkv1.SriovOperatorConfig{ObjectMeta: metav1.ObjectMeta{ + Name: consts.DefaultConfigName, + Namespace: testNamespace, + }, + Spec: sriovnetworkv1.SriovOperatorConfigSpec{ + LogLevel: 1, + }, + } + + err := k8sClient.Create(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + validateExpectedLogLevel(1) + + soc.Spec.LogLevel = 2 + err = k8sClient.Update(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + validateExpectedLogLevel(2) + }) + }) + + Context("Disable Drain", func() { + It("should update the skip drain flag", func() { + soc := &sriovnetworkv1.SriovOperatorConfig{ObjectMeta: metav1.ObjectMeta{ + Name: consts.DefaultConfigName, + Namespace: testNamespace, + }, + Spec: sriovnetworkv1.SriovOperatorConfigSpec{ + DisableDrain: true, + }, + } + + err := k8sClient.Create(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + validateExpectedDrain(true) + + soc.Spec.DisableDrain = false + err = k8sClient.Update(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + validateExpectedDrain(false) + }) + }) + + Context("Feature gates", func() { + It("should update the feature gates struct", func() { + soc := &sriovnetworkv1.SriovOperatorConfig{ObjectMeta: metav1.ObjectMeta{ + Name: consts.DefaultConfigName, + Namespace: testNamespace, + }, + Spec: sriovnetworkv1.SriovOperatorConfigSpec{ + FeatureGates: map[string]bool{ + "test": true, + "bla": true, + }, + }, + } + + err := k8sClient.Create(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(vars.FeatureGate.IsEnabled("test")).To(BeTrue()) + }, "15s", "3s").Should(Succeed()) + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(vars.FeatureGate.IsEnabled("bla")).To(BeTrue()) + }, "15s", "3s").Should(Succeed()) + + soc.Spec.FeatureGates["test"] = false + err = k8sClient.Update(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(vars.FeatureGate.IsEnabled("test")).To(BeFalse()) + }, "15s", "3s").Should(Succeed()) + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(vars.FeatureGate.IsEnabled("bla")).To(BeTrue()) + }, "15s", "3s").Should(Succeed()) + }) + }) +}) + +func validateExpectedLogLevel(level int) { + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(snolog.GetLogLevel()).To(Equal(level)) + }, "15s", "3s").Should(Succeed()) +} + +func validateExpectedDrain(disableDrain bool) { + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(vars.DisableDrain).To(Equal(disableDrain)) + }, "15s", "3s").Should(Succeed()) +} diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 53fe82b8b..422621e00 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -3,29 +3,22 @@ package daemon import ( "context" "fmt" - "math/rand" - "reflect" - "sync" "time" - "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" - sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper" - snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms" plugin "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/systemd" @@ -33,58 +26,27 @@ import ( "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" ) -const ( - // updateDelay is the baseline speed at which we react to changes. We don't - // need to react in milliseconds as any change would involve rebooting the node. - updateDelay = 5 * time.Second - // maxUpdateBackoff is the maximum time to react to a change as we back off - // in the face of errors. - maxUpdateBackoff = 60 * time.Second -) - -type Message struct { - syncStatus string - lastSyncError string -} - -type Daemon struct { +type DaemonReconcile struct { client client.Client sriovClient snclientset.Interface // kubeClient allows interaction with Kubernetes, including the node we are running on. kubeClient kubernetes.Interface - desiredNodeState *sriovnetworkv1.SriovNetworkNodeState - currentNodeState *sriovnetworkv1.SriovNetworkNodeState - - // list of disabled plugins - disabledPlugins []string - - loadedPlugins map[string]plugin.VendorPlugin - HostHelpers helper.HostHelpersInterface platformHelpers platforms.Interface - // channel used by callbacks to signal Run() of an error - exitCh chan<- error - - // channel used to ensure all spawned goroutines exit when we exit. - stopCh <-chan struct{} - - syncCh <-chan struct{} - - refreshCh chan<- Message - - mu *sync.Mutex - - disableDrain bool - - workqueue workqueue.RateLimitingInterface - eventRecorder *EventRecorder featureGate featuregate.FeatureGate + + // list of disabled plugins + disabledPlugins []string + + loadedPlugins map[string]plugin.VendorPlugin + lastAppliedGeneration int64 + disableDrain bool } func New( @@ -93,398 +55,304 @@ func New( kubeClient kubernetes.Interface, hostHelpers helper.HostHelpersInterface, platformHelper platforms.Interface, - exitCh chan<- error, - stopCh <-chan struct{}, - syncCh <-chan struct{}, - refreshCh chan<- Message, er *EventRecorder, featureGates featuregate.FeatureGate, disabledPlugins []string, -) *Daemon { - return &Daemon{ - client: client, - sriovClient: sriovClient, - kubeClient: kubeClient, - HostHelpers: hostHelpers, - platformHelpers: platformHelper, - exitCh: exitCh, - stopCh: stopCh, - syncCh: syncCh, - refreshCh: refreshCh, - desiredNodeState: &sriovnetworkv1.SriovNetworkNodeState{}, - currentNodeState: &sriovnetworkv1.SriovNetworkNodeState{}, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)}, - workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "SriovNetworkNodeState"), - eventRecorder: er, - featureGate: featureGates, - disabledPlugins: disabledPlugins, - mu: &sync.Mutex{}, +) *DaemonReconcile { + return &DaemonReconcile{ + client: client, + sriovClient: sriovClient, + kubeClient: kubeClient, + HostHelpers: hostHelpers, + platformHelpers: platformHelper, + + lastAppliedGeneration: 0, + eventRecorder: er, + featureGate: featureGates, + disabledPlugins: disabledPlugins, } } -// Run the config daemon -func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error { - log.Log.V(0).Info("Run()", "node", vars.NodeName) - - if vars.ClusterType == consts.ClusterTypeOpenshift { - log.Log.V(0).Info("Run(): start daemon.", "openshiftFlavor", dn.platformHelpers.GetFlavor()) - } else { - log.Log.V(0).Info("Run(): start daemon.") - } +func (dn *DaemonReconcile) DaemonInitialization() error { + funcLog := log.Log.WithName("DaemonInitialization") + var err error if !vars.UsingSystemdMode { - log.Log.V(0).Info("Run(): daemon running in daemon mode") - dn.HostHelpers.CheckRDMAEnabled() + funcLog.V(0).Info("daemon running in daemon mode") + _, err = dn.HostHelpers.CheckRDMAEnabled() + if err != nil { + funcLog.Error(err, "warning, failed to check RDMA state") + } dn.HostHelpers.TryEnableTun() dn.HostHelpers.TryEnableVhostNet() - err := systemd.CleanSriovFilesFromHost(vars.ClusterType == consts.ClusterTypeOpenshift) + err = systemd.CleanSriovFilesFromHost(vars.ClusterType == consts.ClusterTypeOpenshift) if err != nil { - log.Log.Error(err, "failed to remove all the systemd sriov files") + funcLog.Error(err, "failed to remove all the systemd sriov files") } } else { - log.Log.V(0).Info("Run(): daemon running in systemd mode") + funcLog.V(0).Info("Run(): daemon running in systemd mode") } - // Only watch own SriovNetworkNodeState CR - defer utilruntime.HandleCrash() - defer dn.workqueue.ShutDown() - if err := dn.prepareNMUdevRule(); err != nil { - log.Log.Error(err, "failed to prepare udev files to disable network manager on requested VFs") + funcLog.Error(err, "failed to prepare udev files to disable network manager on requested VFs") } if err := dn.HostHelpers.PrepareVFRepUdevRule(); err != nil { - log.Log.Error(err, "failed to prepare udev files to rename VF representors for requested VFs") - } - - var timeout int64 = 5 - var metadataKey = "metadata.name" - informerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient, - time.Second*15, - vars.Namespace, - func(lo *metav1.ListOptions) { - lo.FieldSelector = metadataKey + "=" + vars.NodeName - lo.TimeoutSeconds = &timeout - }, - ) - - informer := informerFactory.Sriovnetwork().V1().SriovNetworkNodeStates().Informer() - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: dn.enqueueNodeState, - UpdateFunc: func(old, new interface{}) { - dn.enqueueNodeState(new) - }, - }) - - cfgInformerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient, - time.Second*30, - vars.Namespace, - func(lo *metav1.ListOptions) { - lo.FieldSelector = metadataKey + "=" + "default" - }, - ) - - cfgInformer := cfgInformerFactory.Sriovnetwork().V1().SriovOperatorConfigs().Informer() - cfgInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: dn.operatorConfigAddHandler, - UpdateFunc: dn.operatorConfigChangeHandler, - }) - - rand.Seed(time.Now().UnixNano()) - go cfgInformer.Run(dn.stopCh) - time.Sleep(5 * time.Second) - go informer.Run(dn.stopCh) - if ok := cache.WaitForCacheSync(stopCh, cfgInformer.HasSynced, informer.HasSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") + funcLog.Error(err, "failed to prepare udev files to rename VF representors for requested VFs") } - log.Log.Info("Starting workers") - // Launch one worker to process - go wait.Until(dn.runWorker, time.Second, stopCh) - log.Log.Info("Started workers") + ns := &sriovnetworkv1.SriovNetworkNodeState{} + // init openstack info + if vars.PlatformType == consts.VirtualOpenStack { + ns, err = dn.HostHelpers.GetCheckPointNodeState() + if err != nil { + return err + } - for { - select { - case <-stopCh: - log.Log.V(0).Info("Run(): stop daemon") - return nil - case err, more := <-exitCh: - log.Log.Error(err, "got an error") - if more { - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusFailed, - lastSyncError: err.Error(), - } + if ns == nil { + err = dn.platformHelpers.CreateOpenstackDevicesInfo() + if err != nil { + return err } - return err + } else { + dn.platformHelpers.CreateOpenstackDevicesInfoFromNodeStatus(ns) } } -} -func (dn *Daemon) runWorker() { - for dn.processNextWorkItem() { + // get interfaces + err = dn.getHostNetworkStatus(ns) + if err != nil { + funcLog.Error(err, "failed to get host network status on init") + return err } -} -func (dn *Daemon) enqueueNodeState(obj interface{}) { - var ns *sriovnetworkv1.SriovNetworkNodeState - var ok bool - if ns, ok = obj.(*sriovnetworkv1.SriovNetworkNodeState); !ok { - utilruntime.HandleError(fmt.Errorf("expected SriovNetworkNodeState but got %#v", obj)) - return + // init vendor plugins + dn.loadedPlugins, err = loadPlugins(ns, dn.HostHelpers, dn.disabledPlugins) + if err != nil { + funcLog.Error(err, "failed to enable vendor plugins") + return err } - key := ns.GetGeneration() - dn.workqueue.Add(key) -} - -func (dn *Daemon) processNextWorkItem() bool { - log.Log.V(2).Info("processNextWorkItem", "worker-queue-size", dn.workqueue.Len()) - obj, shutdown := dn.workqueue.Get() - if shutdown { - return false - } - - log.Log.V(2).Info("get item from queue", "item", obj.(int64)) - - // We wrap this block in a func so we can defer c.workqueue.Done. - err := func(obj interface{}) error { - // We call Done here so the workqueue knows we have finished - // processing this item. - defer dn.workqueue.Done(obj) - var key int64 - var ok bool - if key, ok = obj.(int64); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here. - dn.workqueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected workItem in workqueue but got %#v", obj)) - return nil - } - - err := dn.nodeStateSyncHandler() - if err != nil { - // Ereport error message, and put the item back to work queue for retry. - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusFailed, - lastSyncError: err.Error(), - } - <-dn.syncCh - dn.workqueue.AddRateLimited(key) - return fmt.Errorf("error syncing: %s, requeuing", err.Error()) - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - dn.workqueue.Forget(obj) - log.Log.Info("Successfully synced") - return nil - }(obj) + // save init state + err = dn.HostHelpers.WriteCheckpointFile(ns) if err != nil { - utilruntime.HandleError(err) + funcLog.Error(err, "failed to write checkpoint file on host") } - - return true -} - -func (dn *Daemon) operatorConfigAddHandler(obj interface{}) { - dn.operatorConfigChangeHandler(&sriovnetworkv1.SriovOperatorConfig{}, obj) + return nil } -func (dn *Daemon) operatorConfigChangeHandler(old, new interface{}) { - oldCfg := old.(*sriovnetworkv1.SriovOperatorConfig) - newCfg := new.(*sriovnetworkv1.SriovOperatorConfig) - if newCfg.Namespace != vars.Namespace || newCfg.Name != consts.DefaultConfigName { - log.Log.V(2).Info("unsupported SriovOperatorConfig", "namespace", newCfg.Namespace, "name", newCfg.Name) - return +func (dn *DaemonReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + reqLogger := log.FromContext(ctx).WithName("Reconcile") + // Get the latest NodeState + desiredNodeState := &sriovnetworkv1.SriovNetworkNodeState{} + err := dn.client.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, desiredNodeState) + if err != nil { + if errors.IsNotFound(err) { + reqLogger.Info("NodeState doesn't exist") + return ctrl.Result{}, nil + } + reqLogger.Error(err, "Failed to fetch node state", "name", vars.NodeName) + return ctrl.Result{}, err } - snolog.SetLogLevel(newCfg.Spec.LogLevel) - - newDisableDrain := newCfg.Spec.DisableDrain - if dn.disableDrain != newDisableDrain { - dn.disableDrain = newDisableDrain - log.Log.Info("Set Disable Drain", "value", dn.disableDrain) + // Check the object as the drain controller annotations + // if not just wait for the drain controller to add them before we start taking care of the nodeState + if !utils.ObjectHasAnnotationKey(desiredNodeState, consts.NodeStateDrainAnnotationCurrent) || + !utils.ObjectHasAnnotationKey(desiredNodeState, consts.NodeStateDrainAnnotation) { + reqLogger.V(2).Info("NodeState doesn't have the current drain annotation") + return ctrl.Result{}, nil } - if !reflect.DeepEqual(oldCfg.Spec.FeatureGates, newCfg.Spec.FeatureGates) { - dn.featureGate.Init(newCfg.Spec.FeatureGates) - log.Log.Info("Updated featureGates", "featureGates", dn.featureGate.String()) - } + latest := desiredNodeState.GetGeneration() + current := desiredNodeState.DeepCopy() + reqLogger.V(0).Info("new generation", "generation", latest) - vars.MlxPluginFwReset = dn.featureGate.IsEnabled(consts.MellanoxFirmwareResetFeatureGate) -} + // Update the nodeState Status object with the existing network state (interfaces bridges and rdma status) + err = dn.getHostNetworkStatus(desiredNodeState) + if err != nil { + reqLogger.Error(err, "failed to get host network status") + return ctrl.Result{}, err + } -func (dn *Daemon) nodeStateSyncHandler() error { - var err error - // Get the latest NodeState - var sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusSucceeded, LastSyncError: ""} - dn.desiredNodeState, err = dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{}) + // if we are running in systemd mode we want to get the sriov result from the config-daemon that runs in systemd + sriovResult, exist, err := dn.checkSystemdStatus(ctx, desiredNodeState) + //TODO: in the case we need to think what to do if we try to apply again or not if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName) - return err + reqLogger.Error(err, "failed to check systemd status unexpected error") + return ctrl.Result{}, nil } - latest := dn.desiredNodeState.GetGeneration() - log.Log.V(0).Info("nodeStateSyncHandler(): new generation", "generation", latest) - // load plugins if it has not loaded - if len(dn.loadedPlugins) == 0 { - dn.loadedPlugins, err = loadPlugins(dn.desiredNodeState, dn.HostHelpers, dn.disabledPlugins) + // if we are on the latest generation make a refresh on the nics + if dn.lastAppliedGeneration == latest { + isDrifted, err := dn.checkHostStateDrift(ctx, desiredNodeState) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to enable vendor plugins") - return err + reqLogger.Error(err, "failed to refresh host state") + return ctrl.Result{}, err } - } - skipReconciliation := true - // if the operator complete the drain operator we should continue the configuration - if !dn.isDrainCompleted() { - if vars.UsingSystemdMode && dn.currentNodeState.GetGeneration() == latest { - serviceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovServicePath) - if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config service exist on host") - return err - } - postNetworkServiceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovPostNetworkServicePath) + // if there are no node drifted changes, and we are on the latest applied policy + // we check if we need to publish a new nodeState status if not we requeue + if !isDrifted { + shouldUpdate, err := dn.shouldUpdateStatus(current, desiredNodeState) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config-post-network service exist on host") - return err + reqLogger.Error(err, "failed to check host state") + return ctrl.Result{}, err } - // if the service doesn't exist we should continue to let the k8s plugin to create the service files - // this is only for k8s base environments, for openshift the sriov-operator creates a machine config to will apply - // the system service and reboot the node the config-daemon doesn't need to do anything. - if !(serviceEnabled && postNetworkServiceEnabled) { - sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusFailed, - LastSyncError: fmt.Sprintf("some sriov systemd services are not available on node: "+ - "sriov-config available:%t, sriov-config-post-network available:%t", serviceEnabled, postNetworkServiceEnabled)} - } else { - sriovResult, err = systemd.ReadSriovResult() + if shouldUpdate { + reqLogger.Info("updating nodeState with new host status") + err = dn.updateSyncState(ctx, desiredNodeState, desiredNodeState.Status.SyncStatus, desiredNodeState.Status.LastSyncError) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to load sriov result file from host") - return err + reqLogger.Error(err, "failed to update nodeState new host status") + return ctrl.Result{}, err } } - if sriovResult.LastSyncError != "" || sriovResult.SyncStatus == consts.SyncStatusFailed { - log.Log.Info("nodeStateSyncHandler(): sync failed systemd service error", "last-sync-error", sriovResult.LastSyncError) - // add the error but don't requeue - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusFailed, - lastSyncError: sriovResult.LastSyncError, - } - <-dn.syncCh - return nil - } + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } + } - skipReconciliation, err = dn.shouldSkipReconciliation(dn.desiredNodeState) + // set sync state to inProgress, but we don't clear the failed status + err = dn.updateSyncState(ctx, desiredNodeState, consts.SyncStatusInProgress, desiredNodeState.Status.LastSyncError) + if err != nil { + reqLogger.Error(err, "failed to update sync status to inProgress") + return ctrl.Result{}, err + } + + reqReboot, reqDrain, err := dn.checkOnNodeStateChange(desiredNodeState) + if err != nil { + return ctrl.Result{}, err + } + + if vars.UsingSystemdMode { + // When running using systemd check if the applied configuration is the latest one + // or there is a new config we need to apply + // When using systemd configuration we write the file + systemdConfModified, err := dn.writeSystemdConfigFile(desiredNodeState) if err != nil { - return err + reqLogger.Error(err, "failed to write systemd config file") + return ctrl.Result{}, err } + reqDrain = reqDrain || systemdConfModified || !exist + // require reboot if drain needed for systemd mode + reqReboot = reqReboot || systemdConfModified || reqDrain || !exist } - // we are done with the configuration just return here - if dn.currentNodeState.GetGeneration() == dn.desiredNodeState.GetGeneration() && - dn.desiredNodeState.Status.SyncStatus == consts.SyncStatusSucceeded && skipReconciliation { - log.Log.Info("Current state and desire state are equal together with sync status succeeded nothing to do") - return nil - } + reqLogger.V(0).Info("aggregated daemon node state requirement", + "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain) - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusInProgress, - lastSyncError: "", + // handle drain only if the plugins request drain, or we are already in a draining request state + if reqDrain || + !utils.ObjectHasAnnotation(desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainIdle) { + drainInProcess, err := dn.handleDrain(ctx, desiredNodeState, reqReboot) + if err != nil { + reqLogger.Error(err, "failed to handle drain") + return ctrl.Result{}, err + } + // drain is still in progress we don't need to re-queue the request as the operator will update the annotation + if drainInProcess { + return ctrl.Result{}, nil + } } - // wait for writer to refresh status then pull again the latest node state - <-dn.syncCh - // we need to load the latest status to our object - // if we don't do it we can have a race here where the user remove the virtual functions but the operator didn't - // trigger the refresh - updatedState, err := dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{}) - if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName) - return err + // if we finish the drain we should run apply here + if dn.isDrainCompleted(reqDrain, desiredNodeState) { + return dn.apply(ctx, desiredNodeState, reqReboot, sriovResult) } - dn.desiredNodeState.Status = updatedState.Status + return ctrl.Result{}, nil +} + +func (dn *DaemonReconcile) checkOnNodeStateChange(desiredNodeState *sriovnetworkv1.SriovNetworkNodeState) (bool, bool, error) { + funcLog := log.Log.WithName("checkOnNodeStateChange") reqReboot := false reqDrain := false // check if any of the plugins required to drain or reboot the node for k, p := range dn.loadedPlugins { - d, r := false, false - if dn.currentNodeState.GetName() == "" { - log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for a new node state") - } else { - log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for an updated node state") - } - d, r, err = p.OnNodeStateChange(dn.desiredNodeState) + d, r, err := p.OnNodeStateChange(desiredNodeState) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): OnNodeStateChange plugin error", "plugin-name", k) - return err + funcLog.Error(err, "OnNodeStateChange plugin error", "plugin-name", k) + return false, false, err } - log.Log.V(0).Info("nodeStateSyncHandler(): OnNodeStateChange result", "plugin", k, "drain-required", d, "reboot-required", r) + funcLog.V(0).Info("OnNodeStateChange result", + "plugin", k, + "drain-required", d, + "reboot-required", r) reqDrain = reqDrain || d reqReboot = reqReboot || r } - // When running using systemd check if the applied configuration is the latest one - // or there is a new config we need to apply - // When using systemd configuration we write the file - if vars.UsingSystemdMode { - log.Log.V(0).Info("nodeStateSyncHandler(): writing systemd config file to host") - systemdConfModified, err := systemd.WriteConfFile(dn.desiredNodeState) - if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to write configuration file for systemd mode") - return err - } - if systemdConfModified { - // remove existing result file to make sure that we will not use outdated result, e.g. in case if - // systemd service was not triggered for some reason - err = systemd.RemoveSriovResult() - if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to remove result file for systemd mode") - return err - } - } - reqDrain = reqDrain || systemdConfModified - // require reboot if drain needed for systemd mode - reqReboot = reqReboot || systemdConfModified || reqDrain - log.Log.V(0).Info("nodeStateSyncHandler(): systemd mode WriteConfFile results", - "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain) + return reqReboot, reqDrain, nil +} - err = systemd.WriteSriovSupportedNics() - if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to write supported nic ids file for systemd mode") - return err - } +func (dn *DaemonReconcile) checkSystemdStatus(ctx context.Context, desiredNodeState *sriovnetworkv1.SriovNetworkNodeState) (*systemd.SriovResult, bool, error) { + if !vars.UsingSystemdMode { + return nil, false, nil } - log.Log.V(0).Info("nodeStateSyncHandler(): aggregated daemon", - "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain) + funcLog := log.Log.WithName("checkSystemdStatus") + serviceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovServicePath) + if err != nil { + funcLog.Error(err, "failed to check if sriov-config service exist on host") + return nil, false, err + } + postNetworkServiceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovPostNetworkServicePath) + if err != nil { + funcLog.Error(err, "failed to check if sriov-config-post-network service exist on host") + return nil, false, err + } - // handle drain only if the plugin request drain, or we are already in a draining request state - if reqDrain || !utils.ObjectHasAnnotation(dn.desiredNodeState, - consts.NodeStateDrainAnnotationCurrent, - consts.DrainIdle) { - drainInProcess, err := dn.handleDrain(reqReboot) + // if the service doesn't exist we should continue to let the k8s plugin to create the service files + // this is only for k8s base environments, for openshift the sriov-operator creates a machine config to will apply + // the system service and reboot the node the config-daemon doesn't need to do anything. + sriovResult := &systemd.SriovResult{SyncStatus: consts.SyncStatusFailed, + LastSyncError: fmt.Sprintf("some sriov systemd services are not available on node: "+ + "sriov-config available:%t, sriov-config-post-network available:%t", serviceEnabled, postNetworkServiceEnabled)} + exist := false + + // check if the service exist + if serviceEnabled && postNetworkServiceEnabled { + sriovResult, exist, err = systemd.ReadSriovResult() if err != nil { - log.Log.Error(err, "failed to handle drain") - return err - } - if drainInProcess { - return nil + funcLog.Error(err, "failed to load sriov result file from host") + return nil, false, err } } + //// only if something is not equal we apply if not we continue to check if something change on the node, + //// and we need to trigger a reconfiguration + //if desiredNodeState.Status.SyncStatus != sriovResult.SyncStatus || + // desiredNodeState.Status.LastSyncError != sriovResult.LastSyncError { + // err = dn.updateSyncState(ctx, desiredNodeState, sriovResult.SyncStatus, sriovResult.LastSyncError) + // if err != nil { + // funcLog.Error(err, "failed to update sync status") + // } + // return sriovResult, err + //} + + // TODO: check if we need this + //if sriovResult.LastSyncError != "" || sriovResult.SyncStatus == consts.SyncStatusFailed { + // funcLog.Info("sync failed systemd service error", "last-sync-error", sriovResult.LastSyncError) + // err = dn.updateSyncState(ctx, desiredNodeState, consts.SyncStatusFailed, sriovResult.LastSyncError) + // if err != nil { + // return nil, false, err + // } + // dn.lastAppliedGeneration = desiredNodeState.Generation + // return sriovResult, true, err + //} + return sriovResult, exist, nil +} + +func (dn *DaemonReconcile) apply(ctx context.Context, desiredNodeState *sriovnetworkv1.SriovNetworkNodeState, reqReboot bool, sriovResult *systemd.SriovResult) (ctrl.Result, error) { + reqLogger := log.FromContext(ctx).WithName("Apply") // apply the vendor plugins after we are done with drain if needed for k, p := range dn.loadedPlugins { // Skip both the general and virtual plugin apply them last if k != GenericPluginName && k != VirtualPluginName { err := p.Apply() if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): plugin Apply failed", "plugin-name", k) - return err + reqLogger.Error(err, "plugin Apply failed", "plugin-name", k) + return ctrl.Result{}, err } } } @@ -496,10 +364,10 @@ func (dn *Daemon) nodeStateSyncHandler() error { selectedPlugin, ok := dn.loadedPlugins[GenericPluginName] if ok { // Apply generic plugin last - err = selectedPlugin.Apply() + err := selectedPlugin.Apply() if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): generic plugin fail to apply") - return err + reqLogger.Error(err, "generic plugin fail to apply") + return ctrl.Result{}, err } } @@ -507,179 +375,158 @@ func (dn *Daemon) nodeStateSyncHandler() error { selectedPlugin, ok = dn.loadedPlugins[VirtualPluginName] if ok { // Apply virtual plugin last - err = selectedPlugin.Apply() + err := selectedPlugin.Apply() if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): virtual plugin failed to apply") - return err + reqLogger.Error(err, "virtual plugin failed to apply") + return ctrl.Result{}, err } } } if reqReboot { - log.Log.Info("nodeStateSyncHandler(): reboot node") - dn.eventRecorder.SendEvent("RebootNode", "Reboot node has been initiated") - dn.rebootNode() - return nil + reqLogger.Info("reboot node") + dn.eventRecorder.SendEvent(ctx, "RebootNode", "Reboot node has been initiated") + return ctrl.Result{}, dn.rebootNode() } - // restart device plugin pod - log.Log.Info("nodeStateSyncHandler(): restart device plugin pod") - if err := dn.restartDevicePluginPod(); err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): fail to restart device plugin pod") - return err + if err := dn.restartDevicePluginPod(ctx); err != nil { + reqLogger.Error(err, "failed to restart device plugin on the node") + return ctrl.Result{}, err } - log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for node") - err = utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainIdle, dn.client) + _, err := dn.annotate(ctx, desiredNodeState, consts.DrainIdle) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): Failed to annotate node") - return err + reqLogger.Error(err, "failed to request annotation update to idle") + return ctrl.Result{}, err } - log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for nodeState") - if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState, - consts.NodeStateDrainAnnotation, - consts.DrainIdle, dn.client); err != nil { - return err + reqLogger.Info("sync succeeded") + syncStatus := consts.SyncStatusSucceeded + lastSyncError := "" + if vars.UsingSystemdMode { + syncStatus = sriovResult.SyncStatus + lastSyncError = sriovResult.LastSyncError } - log.Log.Info("nodeStateSyncHandler(): sync succeeded") - dn.currentNodeState = dn.desiredNodeState.DeepCopy() - if vars.UsingSystemdMode { - dn.refreshCh <- Message{ - syncStatus: sriovResult.SyncStatus, - lastSyncError: sriovResult.LastSyncError, - } - } else { - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusSucceeded, - lastSyncError: "", - } + // Update the nodeState Status object with the existing network interfaces + err = dn.getHostNetworkStatus(desiredNodeState) + if err != nil { + reqLogger.Error(err, "failed to get host network status") + return ctrl.Result{}, err } - // wait for writer to refresh the status - <-dn.syncCh - return nil + + err = dn.updateSyncState(ctx, desiredNodeState, syncStatus, lastSyncError) + if err != nil { + reqLogger.Error(err, "failed to update sync status") + return ctrl.Result{}, err + } + + // update the lastAppliedGeneration + dn.lastAppliedGeneration = desiredNodeState.Generation + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } -func (dn *Daemon) shouldSkipReconciliation(latestState *sriovnetworkv1.SriovNetworkNodeState) (bool, error) { - log.Log.V(0).Info("shouldSkipReconciliation()") - var err error +// checkHostStateDrift returns true if the node state drifted from the nodeState policy +// Check if there is a change in the host network interfaces that require a reconfiguration by the daemon +func (dn *DaemonReconcile) checkHostStateDrift(ctx context.Context, desiredNodeState *sriovnetworkv1.SriovNetworkNodeState) (bool, error) { + funcLog := log.Log.WithName("checkHostStateDrift()") // Skip when SriovNetworkNodeState object has just been created. - if latestState.GetGeneration() == 1 && len(latestState.Spec.Interfaces) == 0 { - err = dn.HostHelpers.ClearPCIAddressFolder() + if desiredNodeState.GetGeneration() == 1 && len(desiredNodeState.Spec.Interfaces) == 0 { + err := dn.HostHelpers.ClearPCIAddressFolder() if err != nil { - log.Log.Error(err, "failed to clear the PCI address configuration") + funcLog.Error(err, "failed to clear the PCI address configuration") return false, err } - log.Log.V(0).Info( - "shouldSkipReconciliation(): interface policy spec not yet set by controller for sriovNetworkNodeState", - "name", latestState.Name) - if latestState.Status.SyncStatus != consts.SyncStatusSucceeded { - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusSucceeded, - lastSyncError: "", - } - // wait for writer to refresh status - <-dn.syncCh + funcLog.V(0).Info("interface policy spec not yet set by controller for sriovNetworkNodeState", + "name", desiredNodeState.Name) + if desiredNodeState.Status.SyncStatus != consts.SyncStatusSucceeded || + desiredNodeState.Status.LastSyncError != "" { + err = dn.updateSyncState(ctx, desiredNodeState, consts.SyncStatusSucceeded, "") } - return true, nil + return false, err } // Verify changes in the status of the SriovNetworkNodeState CR. - if dn.currentNodeState.GetGeneration() == latestState.GetGeneration() { - log.Log.V(0).Info("shouldSkipReconciliation() verifying status change") - for _, p := range dn.loadedPlugins { - // Verify changes in the status of the SriovNetworkNodeState CR. - log.Log.V(0).Info("shouldSkipReconciliation(): verifying status change for plugin", "pluginName", p.Name()) - changed, err := p.CheckStatusChanges(latestState) - if err != nil { - return false, err - } - if changed { - log.Log.V(0).Info("shouldSkipReconciliation(): plugin require change", "pluginName", p.Name()) - return false, nil - } + log.Log.V(0).Info("verifying interfaces status change") + for _, p := range dn.loadedPlugins { + // Verify changes in the status of the SriovNetworkNodeState CR. + log.Log.V(2).Info("verifying status change for plugin", "pluginName", p.Name()) + changed, err := p.CheckStatusChanges(desiredNodeState) + if err != nil { + return false, err } + if changed { + log.Log.V(0).Info("plugin require change", "pluginName", p.Name()) + return true, nil + } + } - log.Log.V(0).Info("shouldSkipReconciliation(): Interface not changed") - if latestState.Status.LastSyncError != "" || - latestState.Status.SyncStatus != consts.SyncStatusSucceeded { - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusSucceeded, - lastSyncError: "", - } - // wait for writer to refresh the status - <-dn.syncCh + log.Log.V(0).Info("Interfaces not changed") + return false, nil +} + +func (dn *DaemonReconcile) writeSystemdConfigFile(desiredNodeState *sriovnetworkv1.SriovNetworkNodeState) (bool, error) { + funcLog := log.Log.WithName("writeSystemdConfigFile()") + funcLog.V(0).Info("writing systemd config file to host") + systemdConfModified, err := systemd.WriteConfFile(desiredNodeState) + if err != nil { + funcLog.Error(err, "failed to write configuration file for systemd mode") + return false, err + } + if systemdConfModified { + // remove existing result file to make sure that we will not use outdated result, e.g. in case if + // systemd service was not triggered for some reason + err = systemd.RemoveSriovResult() + if err != nil { + funcLog.Error(err, "failed to remove result file for systemd mode") + return false, err } + } - return true, nil + err = systemd.WriteSriovSupportedNics() + if err != nil { + funcLog.Error(err, "failed to write supported nic ids file for systemd mode") + return false, err } - return false, nil + funcLog.V(0).Info("systemd mode WriteConfFile results", + "drain-required", systemdConfModified, "reboot-required", systemdConfModified) + return systemdConfModified, nil } // handleDrain: adds the right annotation to the node and nodeState object // returns true if we need to finish the reconcile loop and wait for a new object -func (dn *Daemon) handleDrain(reqReboot bool) (bool, error) { +func (dn *DaemonReconcile) handleDrain(ctx context.Context, desiredNodeState *sriovnetworkv1.SriovNetworkNodeState, reqReboot bool) (bool, error) { + funcLog := log.Log.WithName("handleDrain") // done with the drain we can continue with the configuration - if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) { - log.Log.Info("handleDrain(): the node complete the draining") + if utils.ObjectHasAnnotation(desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) { + funcLog.Info("the node complete the draining") return false, nil } // the operator is still draining the node so we reconcile - if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.Draining) { - log.Log.Info("handleDrain(): the node is still draining") + if utils.ObjectHasAnnotation(desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.Draining) { + funcLog.Info("the node is still draining") return true, nil } // drain is disabled we continue with the configuration if dn.disableDrain { - log.Log.Info("handleDrain(): drain is disabled in sriovOperatorConfig") + funcLog.Info("drain is disabled in sriovOperatorConfig") return false, nil } + // annotate both node and node state with drain or reboot + annotation := consts.DrainRequired if reqReboot { - log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for node") - err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.RebootRequired, dn.client) - if err != nil { - log.Log.Error(err, "applyDrainRequired(): Failed to annotate node") - return false, err - } - - log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for nodeState") - if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState, - consts.NodeStateDrainAnnotation, - consts.RebootRequired, dn.client); err != nil { - return false, err - } - - // the node was annotated we need to wait for the operator to finish the drain - return true, nil - } - log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for node") - err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainRequired, dn.client) - if err != nil { - log.Log.Error(err, "handleDrain(): Failed to annotate node") - return false, err + annotation = consts.RebootRequired } - - log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for nodeState") - if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState, - consts.NodeStateDrainAnnotation, - consts.DrainRequired, dn.client); err != nil { - return false, err - } - - // the node was annotated we need to wait for the operator to finish the drain - return true, nil + return dn.annotate(ctx, desiredNodeState, annotation) } -func (dn *Daemon) restartDevicePluginPod() error { - dn.mu.Lock() - defer dn.mu.Unlock() +func (dn *DaemonReconcile) restartDevicePluginPod(ctx context.Context) error { log.Log.V(2).Info("restartDevicePluginPod(): try to restart device plugin pod") pods, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{ @@ -714,8 +561,8 @@ func (dn *Daemon) restartDevicePluginPod() error { return err } - if err := wait.PollImmediateUntil(3*time.Second, func() (bool, error) { - _, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).Get(context.Background(), podToDelete, metav1.GetOptions{}) + if err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(ctx context.Context) (bool, error) { + _, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).Get(ctx, podToDelete, metav1.GetOptions{}) if errors.IsNotFound(err) { log.Log.Info("restartDevicePluginPod(): device plugin pod exited") return true, nil @@ -727,7 +574,7 @@ func (dn *Daemon) restartDevicePluginPod() error { log.Log.Info("restartDevicePluginPod(): waiting for device plugin pod to exit", "pod-name", podToDelete) } return false, nil - }, dn.stopCh); err != nil { + }); err != nil { log.Log.Error(err, "restartDevicePluginPod(): failed to wait for checking pod deletion") return err } @@ -736,11 +583,13 @@ func (dn *Daemon) restartDevicePluginPod() error { return nil } -func (dn *Daemon) rebootNode() { - log.Log.Info("rebootNode(): trigger node reboot") +func (dn *DaemonReconcile) rebootNode() error { + funcLog := log.Log.WithName("rebootNode") + funcLog.Info("trigger node reboot") exit, err := dn.HostHelpers.Chroot(consts.Host) if err != nil { - log.Log.Error(err, "rebootNode(): chroot command failed") + funcLog.Error(err, "chroot command failed") + return err } defer exit() // creates a new transient systemd unit to reboot the system. @@ -754,11 +603,13 @@ func (dn *Daemon) rebootNode() { "--description", "sriov-network-config-daemon reboot node", "/bin/sh", "-c", "systemctl stop kubelet.service; reboot") if err != nil { - log.Log.Error(err, "failed to reboot node", "stdOut", stdOut, "StdErr", StdErr) + funcLog.Error(err, "failed to reboot node", "stdOut", stdOut, "StdErr", StdErr) + return err } + return nil } -func (dn *Daemon) prepareNMUdevRule() error { +func (dn *DaemonReconcile) prepareNMUdevRule() error { // we need to remove the Red Hat Virtio network device from the udev rule configuration // if we don't remove it when running the config-daemon on a virtual node it will disconnect the node after a reboot // even that the operator should not be installed on virtual environments that are not openstack @@ -775,6 +626,57 @@ func (dn *Daemon) prepareNMUdevRule() error { } // isDrainCompleted returns true if the current-state annotation is drain completed -func (dn *Daemon) isDrainCompleted() bool { - return utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) +func (dn *DaemonReconcile) isDrainCompleted(reqDrain bool, desiredNodeState *sriovnetworkv1.SriovNetworkNodeState) bool { + // if we need to drain check the drain status + if reqDrain { + return utils.ObjectHasAnnotation(desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) + } + + // check in case a reboot was requested and the second run doesn't require a drain + if !utils.ObjectHasAnnotation(desiredNodeState, consts.NodeStateDrainAnnotation, consts.DrainIdle) { + return utils.ObjectHasAnnotation(desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) + } + + // if we don't need to drain at all just return true so we can apply the configuration + return true +} + +func (dn *DaemonReconcile) annotate( + ctx context.Context, + desiredNodeState *sriovnetworkv1.SriovNetworkNodeState, + annotationState string) (bool, error) { + funcLog := log.Log.WithName("annotate") + + funcLog.Info(fmt.Sprintf("apply '%s' annotation for node", annotationState)) + err := utils.AnnotateNode(ctx, desiredNodeState.Name, consts.NodeDrainAnnotation, annotationState, dn.client) + if err != nil { + log.Log.Error(err, "Failed to annotate node") + return false, err + } + + funcLog.Info(fmt.Sprintf("apply '%s' annotation for nodeState", annotationState)) + if err := utils.AnnotateObject(context.Background(), desiredNodeState, + consts.NodeStateDrainAnnotation, + annotationState, dn.client); err != nil { + return false, err + } + + // the node was annotated we need to wait for the operator to finish the drain + return true, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (dn *DaemonReconcile) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&sriovnetworkv1.SriovNetworkNodeState{}). + WithEventFilter(predicate.Or(predicate.AnnotationChangedPredicate{}, predicate.GenerationChangedPredicate{})). + Complete(dn) +} + +// ------------------------------------- +// ---- unit tests helper function ----- +// ------------------------------------- + +func (dn *DaemonReconcile) GetLastAppliedGeneration() int64 { + return dn.lastAppliedGeneration } diff --git a/pkg/daemon/daemon_suite_test.go b/pkg/daemon/daemon_suite_test.go new file mode 100644 index 000000000..89432eae7 --- /dev/null +++ b/pkg/daemon/daemon_suite_test.go @@ -0,0 +1,135 @@ +package daemon_test + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" + openshiftconfigv1 "github.com/openshift/api/config/v1" + mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + "go.uber.org/zap/zapcore" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" + "github.com/k8snetworkplumbingwg/sriov-network-operator/test/util" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var ( + k8sClient client.Client + testEnv *envtest.Environment + cfg *rest.Config +) + +// Define utility constants for object names and testing timeouts/durations and intervals. +const testNamespace = "openshift-sriov-network-operator" + +var _ = BeforeSuite(func() { + var err error + + logf.SetLogger(zap.New( + zap.WriteTo(GinkgoWriter), + zap.UseDevMode(true), + func(o *zap.Options) { + o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder + })) + + // Go to project root directory + err = os.Chdir("../..") + Expect(err).NotTo(HaveOccurred()) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("config", "crd", "bases"), filepath.Join("test", "util", "crds")}, + ErrorIfCRDPathMissing: true, + } + + testEnv.ControlPlane.GetAPIServer().Configure().Set("disable-admission-plugins", "MutatingAdmissionWebhook", "ValidatingAdmissionWebhook") + + cfg, err = testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + By("registering schemes") + err = sriovnetworkv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = netattdefv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = mcfgv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = openshiftconfigv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = monitoringv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + vars.Config = cfg + vars.Scheme = scheme.Scheme + vars.Namespace = testNamespace + + By("creating K8s client") + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + + By("creating default/common k8s objects for tests") + // Create test namespace + ns := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: testNamespace, + }, + Spec: corev1.NamespaceSpec{}, + Status: corev1.NamespaceStatus{}, + } + Expect(k8sClient.Create(context.Background(), ns)).Should(Succeed()) + + sa := &corev1.ServiceAccount{TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + Namespace: testNamespace, + }} + Expect(k8sClient.Create(context.Background(), sa)).Should(Succeed()) + + // Create openshift Infrastructure + infra := &openshiftconfigv1.Infrastructure{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + }, + Spec: openshiftconfigv1.InfrastructureSpec{}, + Status: openshiftconfigv1.InfrastructureStatus{ + ControlPlaneTopology: openshiftconfigv1.HighlyAvailableTopologyMode, + }, + } + Expect(k8sClient.Create(context.Background(), infra)).Should(Succeed()) +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + if testEnv != nil { + Eventually(func() error { + return testEnv.Stop() + }, util.APITimeout, time.Second).ShouldNot(HaveOccurred()) + } +}) + +func TestDaemon(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Daemon Suite") +} diff --git a/pkg/daemon/daemon_test.go b/pkg/daemon/daemon_test.go index 67a56633f..0792026c4 100644 --- a/pkg/daemon/daemon_test.go +++ b/pkg/daemon/daemon_test.go @@ -1,337 +1,315 @@ -package daemon +package daemon_test import ( "context" - "flag" - "testing" + "sync" + "time" - "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "go.uber.org/zap/zapcore" + + "github.com/golang/mock/gomock" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - fakek8s "k8s.io/client-go/kubernetes/fake" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - kclient "sigs.k8s.io/controller-runtime/pkg/client/fake" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" - snclient "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" - snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned/fake" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" + constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/daemon" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper" mock_helper "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper/mock" + snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms" mock_platforms "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms/mock" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms/openshift" - plugin "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins/fake" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins/generic" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" - "github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/fakefilesystem" ) -var SriovDevicePluginPod corev1.Pod - -func TestConfigDaemon(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Config Daemon Suite") -} - -var _ = BeforeSuite(func() { - // Increase verbosity to help debugging failures - flag.Set("logtostderr", "true") - flag.Set("stderrthreshold", "WARNING") - flag.Set("v", "2") - - logf.SetLogger(zap.New( - zap.WriteTo(GinkgoWriter), - zap.UseDevMode(true), - func(o *zap.Options) { - o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder - })) -}) - -var _ = Describe("Config Daemon", func() { - var stopCh chan struct{} - var syncCh chan struct{} - var exitCh chan error - var refreshCh chan Message - - var cleanFakeFs func() +var ( + cancel context.CancelFunc + ctx context.Context + k8sManager manager.Manager + snclient *snclientset.Clientset + kubeclient *kubernetes.Clientset + eventRecorder *daemon.EventRecorder + wg sync.WaitGroup + startDaemon func(dc *daemon.DaemonReconcile) + + t FullGinkgoTInterface + mockCtrl *gomock.Controller + hostHelper *mock_helper.MockHostHelpersInterface + platformHelper *mock_platforms.MockInterface +) - var sut *Daemon +const ( + waitTime = 30 * time.Minute + retryTime = 5 * time.Second +) - BeforeEach(func() { - stopCh = make(chan struct{}) - refreshCh = make(chan Message) - exitCh = make(chan error) - syncCh = make(chan struct{}, 64) - - // Fill syncCh with values so daemon doesn't wait for a writer - for i := 0; i < 64; i++ { - syncCh <- struct{}{} +var _ = Describe("Daemon Controller", Ordered, func() { + BeforeAll(func() { + ctx, cancel = context.WithCancel(context.Background()) + wg = sync.WaitGroup{} + startDaemon = func(dc *daemon.DaemonReconcile) { + By("start controller manager") + wg.Add(1) + go func() { + defer wg.Done() + defer GinkgoRecover() + By("Start controller manager") + err := k8sManager.Start(ctx) + Expect(err).ToNot(HaveOccurred()) + }() } - // Create virtual filesystem for Daemon - fakeFs := &fakefilesystem.FS{ - Dirs: []string{ - "bindata/scripts", - "host/etc/sriov-operator", - "host/etc/sriov-operator/pci", - "host/etc/udev/rules.d", - }, - Symlinks: map[string]string{}, - Files: map[string][]byte{ - "/bindata/scripts/enable-rdma.sh": []byte(""), - "/bindata/scripts/load-kmod.sh": []byte(""), + Expect(k8sClient.DeleteAllOf(context.Background(), &sriovnetworkv1.SriovOperatorConfig{}, client.InNamespace(testNamespace))).ToNot(HaveOccurred()) + soc := &sriovnetworkv1.SriovOperatorConfig{ObjectMeta: metav1.ObjectMeta{ + Name: constants.DefaultConfigName, + Namespace: testNamespace, + }, + Spec: sriovnetworkv1.SriovOperatorConfigSpec{ + LogLevel: 2, }, } - - var err error - vars.FilesystemRoot, cleanFakeFs, err = fakeFs.Use() + err := k8sClient.Create(ctx, soc) Expect(err).ToNot(HaveOccurred()) - vars.UsingSystemdMode = false - vars.NodeName = "test-node" - vars.Namespace = "sriov-network-operator" - vars.PlatformType = consts.Baremetal + snclient = snclientset.NewForConfigOrDie(cfg) + kubeclient = kubernetes.NewForConfigOrDie(cfg) + eventRecorder = daemon.NewEventRecorder(snclient, kubeclient, scheme.Scheme) + DeferCleanup(func() { + eventRecorder.Shutdown() + }) - FakeSupportedNicIDs := corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: sriovnetworkv1.SupportedNicIDConfigmap, - Namespace: vars.Namespace, - }, - Data: map[string]string{ - "Intel_i40e_XXV710": "8086 158a 154c", - "Nvidia_mlx5_ConnectX-4": "15b3 1013 1014", - }, - } + snolog.SetLogLevel(2) + vars.ClusterType = constants.ClusterTypeOpenshift + }) - err = sriovnetworkv1.AddToScheme(scheme.Scheme) - Expect(err).ToNot(HaveOccurred()) - kClient := kclient.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(&corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "test-node"}}, - &sriovnetworkv1.SriovNetworkNodeState{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - Namespace: vars.Namespace, - }}).Build() - - kubeClient := fakek8s.NewSimpleClientset(&FakeSupportedNicIDs) - snclient := snclientset.NewSimpleClientset() - err = sriovnetworkv1.InitNicIDMapFromConfigMap(kubeClient, vars.Namespace) - Expect(err).ToNot(HaveOccurred()) + BeforeEach(func() { + Expect(k8sClient.DeleteAllOf(context.Background(), &sriovnetworkv1.SriovNetworkNodeState{}, client.InNamespace(testNamespace))).ToNot(HaveOccurred()) - er := NewEventRecorder(snclient, kubeClient) - - t := GinkgoT() - mockCtrl := gomock.NewController(t) - platformHelper := mock_platforms.NewMockInterface(mockCtrl) - platformHelper.EXPECT().GetFlavor().Return(openshift.OpenshiftFlavorDefault).AnyTimes() - platformHelper.EXPECT().IsOpenshiftCluster().Return(false).AnyTimes() - platformHelper.EXPECT().IsHypershift().Return(false).AnyTimes() - - vendorHelper := mock_helper.NewMockHostHelpersInterface(mockCtrl) - vendorHelper.EXPECT().CheckRDMAEnabled().Return(true, nil).AnyTimes() - vendorHelper.EXPECT().TryEnableVhostNet().AnyTimes() - vendorHelper.EXPECT().TryEnableTun().AnyTimes() - vendorHelper.EXPECT().PrepareNMUdevRule([]string{"0x1014", "0x154c"}).Return(nil).AnyTimes() - vendorHelper.EXPECT().PrepareVFRepUdevRule().Return(nil).AnyTimes() - - featureGates := featuregate.New() - - sut = New( - kClient, - snclient, - kubeClient, - vendorHelper, - platformHelper, - exitCh, - stopCh, - syncCh, - refreshCh, - er, - featureGates, - nil, - ) - - sut.loadedPlugins = map[string]plugin.VendorPlugin{generic.PluginName: &fake.FakePlugin{PluginName: "fake"}} - - go func() { - defer GinkgoRecover() - err := sut.Run(stopCh, exitCh) - Expect(err).ToNot(HaveOccurred()) - }() - - SriovDevicePluginPod = corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "sriov-device-plugin-xxxx", - Namespace: vars.Namespace, - Labels: map[string]string{ - "app": "sriov-device-plugin", - }, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - } - _, err = sut.kubeClient.CoreV1().Pods(vars.Namespace).Create(context.Background(), &SriovDevicePluginPod, metav1.CreateOptions{}) - Expect(err).ToNot(HaveOccurred()) + mockCtrl = gomock.NewController(t) + hostHelper = mock_helper.NewMockHostHelpersInterface(mockCtrl) + platformHelper = mock_platforms.NewMockInterface(mockCtrl) + + // daemon initialization default mocks + hostHelper.EXPECT().CheckRDMAEnabled().Return(true, nil) + hostHelper.EXPECT().TryEnableTun() + hostHelper.EXPECT().TryEnableVhostNet() + hostHelper.EXPECT().PrepareNMUdevRule([]string{}).Return(nil) + hostHelper.EXPECT().PrepareVFRepUdevRule().Return(nil) + hostHelper.EXPECT().WriteCheckpointFile(gomock.Any()).Return(nil) + + // general + hostHelper.EXPECT().Chroot(gomock.Any()).Return(func() error { return nil }, nil).AnyTimes() + hostHelper.EXPECT().RunCommand("/bin/sh", gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).AnyTimes() }) AfterEach(func() { - close(stopCh) - close(syncCh) - close(exitCh) - close(refreshCh) - - cleanFakeFs() + By("Shutdown controller manager") + cancel() + wg.Wait() }) - Context("Should", func() { - It("restart sriov-device-plugin pod", func() { - - _, err := sut.kubeClient.CoreV1().Nodes(). - Create(context.Background(), &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "test-node"}, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - nodeState := &sriovnetworkv1.SriovNetworkNodeState{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - Generation: 123, - Annotations: map[string]string{consts.NodeStateDrainAnnotationCurrent: consts.DrainIdle}, - }, - Spec: sriovnetworkv1.SriovNetworkNodeStateSpec{}, - Status: sriovnetworkv1.SriovNetworkNodeStateStatus{ - Interfaces: []sriovnetworkv1.InterfaceExt{ - { - VFs: []sriovnetworkv1.VirtualFunction{ - {}, - }, - DeviceID: "158b", - Driver: "i40e", - Mtu: 1500, - Name: "ens803f0", - PciAddress: "0000:86:00.0", - Vendor: "8086", - NumVfs: 4, - TotalVfs: 64, - }, + Context("Config Daemon", func() { + It("Should expose nodeState Status section", func() { + By("Init mock functions") + afterConfig := false + hostHelper.EXPECT().DiscoverSriovDevices(hostHelper).DoAndReturn(func(helpersInterface helper.HostHelpersInterface) ([]sriovnetworkv1.InterfaceExt, error) { + interfaceExtList := []sriovnetworkv1.InterfaceExt{ + { + Name: "eno1", + Driver: "ice", + PciAddress: "0000:16:00.0", + DeviceID: "1593", + Vendor: "8086", + EswitchMode: "legacy", + LinkAdminState: "up", + LinkSpeed: "10000 Mb/s", + LinkType: "ETH", + Mac: "aa:bb:cc:dd:ee:ff", + Mtu: 1500, + TotalVfs: 2, + NumVfs: 0, }, - }, - } - Expect( - createSriovNetworkNodeState(sut.sriovClient, nodeState)). - To(BeNil()) - - var msg Message - Eventually(refreshCh, "30s").Should(Receive(&msg)) - Expect(msg.syncStatus).To(Equal("InProgress")) + } - Eventually(refreshCh, "30s").Should(Receive(&msg)) - Expect(msg.syncStatus).To(Equal("Succeeded")) + if afterConfig { + interfaceExtList[0].NumVfs = 2 + interfaceExtList[0].VFs = []sriovnetworkv1.VirtualFunction{ + { + Name: "eno1f0", + PciAddress: "0000:16:00.1", + VfID: 0, + }, + { + Name: "eno1f1", + PciAddress: "0000:16:00.2", + VfID: 1, + }} + } + return interfaceExtList, nil + }).AnyTimes() + + hostHelper.EXPECT().LoadPfsStatus("0000:16:00.0").Return(&sriovnetworkv1.Interface{ExternallyManaged: false}, true, nil).AnyTimes() + + hostHelper.EXPECT().ClearPCIAddressFolder().Return(nil).AnyTimes() + hostHelper.EXPECT().DiscoverRDMASubsystem().Return("shared", nil).AnyTimes() + hostHelper.EXPECT().GetCurrentKernelArgs().Return("", nil).AnyTimes() + hostHelper.EXPECT().IsKernelArgsSet("", constants.KernelArgPciRealloc).Return(true).AnyTimes() + hostHelper.EXPECT().IsKernelArgsSet("", constants.KernelArgIntelIommu).Return(true).AnyTimes() + hostHelper.EXPECT().IsKernelArgsSet("", constants.KernelArgIommuPt).Return(true).AnyTimes() + hostHelper.EXPECT().IsKernelArgsSet("", constants.KernelArgIommuPassthrough).Return(true).AnyTimes() + hostHelper.EXPECT().IsKernelArgsSet("", constants.KernelArgRdmaExclusive).Return(false).AnyTimes() + hostHelper.EXPECT().IsKernelArgsSet("", constants.KernelArgRdmaShared).Return(false).AnyTimes() + hostHelper.EXPECT().SetRDMASubsystem("").Return(nil).AnyTimes() + + hostHelper.EXPECT().ConfigSriovInterfaces(gomock.Any(), gomock.Any(), gomock.Any(), false).Return(nil).AnyTimes() + + featureGates := featuregate.New() + featureGates.Init(map[string]bool{}) + dc := CreateDaemon(hostHelper, platformHelper, featureGates, []string{}) + startDaemon(dc) + + _, nodeState := createNode("node1") + By("waiting for state to be succeeded") + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState)). + ToNot(HaveOccurred()) + + g.Expect(nodeState.Status.SyncStatus).To(Equal(constants.SyncStatusSucceeded)) + }, waitTime, retryTime).Should(Succeed()) + + By("add spec to node state") + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState) + Expect(err).ToNot(HaveOccurred()) - Eventually(func() (int, error) { - podList, err := sut.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: "app=sriov-device-plugin", - FieldSelector: "spec.nodeName=test-node", - }) + nodeState.Spec.Interfaces = []sriovnetworkv1.Interface{ + {Name: "eno1", + PciAddress: "0000:16:00.0", + LinkType: "eth", + NumVfs: 2, + VfGroups: []sriovnetworkv1.VfGroup{ + {ResourceName: "test", + DeviceType: "netdevice", + PolicyName: "test-policy", + VfRange: "eno1#0-1"}, + }}, + } + afterConfig = true + err = k8sClient.Update(ctx, nodeState) + Expect(err).ToNot(HaveOccurred()) + By("waiting to require drain") + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState)). + ToNot(HaveOccurred()) + g.Expect(dc.GetLastAppliedGeneration()).To(Equal(int64(2))) + }, waitTime, retryTime).Should(Succeed()) + + err = k8sClient.Get(ctx, types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState) + Expect(err).ToNot(HaveOccurred()) + nodeState.Spec.Interfaces = []sriovnetworkv1.Interface{} + err = k8sClient.Update(ctx, nodeState) + Expect(err).ToNot(HaveOccurred()) - if err != nil { - return 0, err - } + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState)). + ToNot(HaveOccurred()) - return len(podList.Items), nil - }, "10s").Should(BeZero()) + g.Expect(nodeState.Annotations[constants.NodeStateDrainAnnotation]).To(Equal(constants.DrainRequired)) + }, waitTime, retryTime).Should(Succeed()) - }) + patchAnnotation(nodeState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) + // Validate status + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState)). + ToNot(HaveOccurred()) - It("ignore non latest SriovNetworkNodeState generations", func() { - - _, err := sut.kubeClient.CoreV1().Nodes().Create(context.Background(), &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - }, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - nodeState1 := &sriovnetworkv1.SriovNetworkNodeState{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - Generation: 123, - Annotations: map[string]string{consts.NodeStateDrainAnnotationCurrent: consts.DrainIdle}, - }, - } - Expect( - createSriovNetworkNodeState(sut.sriovClient, nodeState1)). - To(BeNil()) - - nodeState2 := &sriovnetworkv1.SriovNetworkNodeState{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - Generation: 777, - Annotations: map[string]string{consts.NodeStateDrainAnnotationCurrent: consts.DrainIdle}, - }, - } - Expect( - updateSriovNetworkNodeState(sut.sriovClient, nodeState2)). - To(BeNil()) + g.Expect(nodeState.Annotations[constants.NodeStateDrainAnnotation]).To(Equal(constants.DrainIdle)) + }, waitTime, retryTime).Should(Succeed()) + patchAnnotation(nodeState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle) - var msg Message - Eventually(refreshCh, "10s").Should(Receive(&msg)) - Expect(msg.syncStatus).To(Equal("InProgress")) + // Validate status + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState)). + ToNot(HaveOccurred()) - Eventually(refreshCh, "10s").Should(Receive(&msg)) - Expect(msg.syncStatus).To(Equal("Succeeded")) + g.Expect(nodeState.Status.SyncStatus).To(Equal(constants.SyncStatusSucceeded)) + }, waitTime, retryTime).Should(Succeed()) - Expect(sut.desiredNodeState.GetGeneration()).To(BeNumerically("==", 777)) + Expect(nodeState.Status.LastSyncError).To(Equal("")) }) + }) +}) - It("restart all the sriov-device-plugin pods present on the node", func() { - otherPod1 := SriovDevicePluginPod.DeepCopy() - otherPod1.Name = "sriov-device-plugin-xxxa" - _, err := sut.kubeClient.CoreV1().Pods(vars.Namespace).Create(context.Background(), otherPod1, metav1.CreateOptions{}) - Expect(err).ToNot(HaveOccurred()) - - otherPod2 := SriovDevicePluginPod.DeepCopy() - otherPod2.Name = "sriov-device-plugin-xxxz" - _, err = sut.kubeClient.CoreV1().Pods(vars.Namespace).Create(context.Background(), otherPod2, metav1.CreateOptions{}) - Expect(err).ToNot(HaveOccurred()) +func patchAnnotation(nodeState *sriovnetworkv1.SriovNetworkNodeState, key, value string) { + originalNodeState := nodeState.DeepCopy() + nodeState.Annotations[key] = value + err := k8sClient.Patch(ctx, nodeState, client.MergeFrom(originalNodeState)) + Expect(err).ToNot(HaveOccurred()) +} - err = sut.restartDevicePluginPod() - Expect(err).ToNot(HaveOccurred()) +func createNode(nodeName string) (*corev1.Node, *sriovnetworkv1.SriovNetworkNodeState) { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Annotations: map[string]string{ + constants.NodeDrainAnnotation: constants.DrainIdle, + "machineconfiguration.openshift.io/desiredConfig": "worker-1", + }, + Labels: map[string]string{ + "test": "", + }, + }, + } + + nodeState := sriovnetworkv1.SriovNetworkNodeState{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Namespace: testNamespace, + Annotations: map[string]string{ + constants.NodeStateDrainAnnotation: constants.DrainIdle, + constants.NodeStateDrainAnnotationCurrent: constants.DrainIdle, + }, + }, + } - Eventually(func() (int, error) { - podList, err := sut.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: "app=sriov-device-plugin", - FieldSelector: "spec.nodeName=test-node", - }) + Expect(k8sClient.Create(ctx, &node)).ToNot(HaveOccurred()) + Expect(k8sClient.Create(ctx, &nodeState)).ToNot(HaveOccurred()) + vars.NodeName = nodeName - if err != nil { - return 0, err - } + return &node, &nodeState +} - return len(podList.Items), nil - }, "1s").Should(BeZero()) - }) +func CreateDaemon( + hostHelper helper.HostHelpersInterface, + platformHelper platforms.Interface, + featureGates featuregate.FeatureGate, + disablePlugins []string) *daemon.DaemonReconcile { + kClient, err := client.New( + cfg, + client.Options{ + Scheme: scheme.Scheme}) + Expect(err).ToNot(HaveOccurred()) + + By("Setup controller manager") + k8sManager, err = ctrl.NewManager(cfg, ctrl.Options{ + Scheme: scheme.Scheme, }) -}) + Expect(err).ToNot(HaveOccurred()) -func createSriovNetworkNodeState(c snclient.Interface, nodeState *sriovnetworkv1.SriovNetworkNodeState) error { - _, err := c.SriovnetworkV1(). - SriovNetworkNodeStates(vars.Namespace). - Create(context.Background(), nodeState, metav1.CreateOptions{}) - return err -} + configController := daemon.New(kClient, snclient, kubeclient, hostHelper, platformHelper, eventRecorder, featureGates, disablePlugins) + err = configController.DaemonInitialization() + Expect(err).ToNot(HaveOccurred()) + err = configController.SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) -func updateSriovNetworkNodeState(c snclient.Interface, nodeState *sriovnetworkv1.SriovNetworkNodeState) error { - _, err := c.SriovnetworkV1(). - SriovNetworkNodeStates(vars.Namespace). - Update(context.Background(), nodeState, metav1.UpdateOptions{}) - return err + return configController } diff --git a/pkg/daemon/event_recorder.go b/pkg/daemon/event_recorder.go index 25b2d2351..9fe766eb4 100644 --- a/pkg/daemon/event_recorder.go +++ b/pkg/daemon/event_recorder.go @@ -5,8 +5,8 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" typedv1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/log" @@ -22,11 +22,11 @@ type EventRecorder struct { } // NewEventRecorder Create a new EventRecorder -func NewEventRecorder(c snclientset.Interface, kubeclient kubernetes.Interface) *EventRecorder { +func NewEventRecorder(c snclientset.Interface, kubeclient kubernetes.Interface, s *runtime.Scheme) *EventRecorder { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartStructuredLogging(4) eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: kubeclient.CoreV1().Events("")}) - eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "config-daemon"}) + eventRecorder := eventBroadcaster.NewRecorder(s, corev1.EventSource{Component: "config-daemon"}) return &EventRecorder{ client: c, eventRecorder: eventRecorder, @@ -35,8 +35,8 @@ func NewEventRecorder(c snclientset.Interface, kubeclient kubernetes.Interface) } // SendEvent Send an Event on the NodeState object -func (e *EventRecorder) SendEvent(eventType string, msg string) { - nodeState, err := e.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{}) +func (e *EventRecorder) SendEvent(ctx context.Context, eventType string, msg string) { + nodeState, err := e.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(ctx, vars.NodeName, metav1.GetOptions{}) if err != nil { log.Log.V(2).Error(err, "SendEvent(): Failed to fetch node state, skip SendEvent", "name", vars.NodeName) return diff --git a/pkg/daemon/status.go b/pkg/daemon/status.go new file mode 100644 index 000000000..a95198fdf --- /dev/null +++ b/pkg/daemon/status.go @@ -0,0 +1,142 @@ +package daemon + +import ( + "context" + "fmt" + "reflect" + + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" +) + +const ( + Unknown = "Unknown" +) + +func (dn *DaemonReconcile) updateSyncState(ctx context.Context, desiredNodeState *sriovnetworkv1.SriovNetworkNodeState, status, failedMessage string) error { + funcLog := log.Log.WithName("updateSyncState") + currentNodeState := &sriovnetworkv1.SriovNetworkNodeState{} + desiredNodeState.Status.SyncStatus = status + desiredNodeState.Status.LastSyncError = failedMessage + + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := dn.client.Get(ctx, client.ObjectKey{desiredNodeState.Namespace, desiredNodeState.Name}, currentNodeState); err != nil { + funcLog.Error(err, "failed to get latest node state", + "SyncStatus", status, + "LastSyncError", failedMessage) + return err + } + + funcLog.V(2).Info("update nodeState status", + "CurrentSyncStatus", currentNodeState.Status.SyncStatus, + "CurrentLastSyncError", currentNodeState.Status.LastSyncError, + "NewSyncStatus", status, + "NewFailedMessage", failedMessage) + + err := dn.client.Status().Patch(ctx, desiredNodeState, client.MergeFrom(currentNodeState)) + if err != nil { + funcLog.Error(err, "failed to update node state status", + "SyncStatus", status, + "LastSyncError", failedMessage) + return err + } + return nil + }) + + if retryErr != nil { + funcLog.Error(retryErr, "failed to update node state status") + return retryErr + } + + dn.recordStatusChangeEvent(ctx, currentNodeState.Status.SyncStatus, status, failedMessage) + return nil +} + +func (dn *DaemonReconcile) shouldUpdateStatus(current, desiredNodeState *sriovnetworkv1.SriovNetworkNodeState) (bool, error) { + // check number of interfaces are equal + if len(current.Status.Interfaces) != len(desiredNodeState.Status.Interfaces) { + return true, nil + } + + // check for bridges + if !reflect.DeepEqual(current.Status.Bridges, desiredNodeState.Status.Bridges) { + return true, nil + } + + // check for system + if !reflect.DeepEqual(current.Status.System, desiredNodeState.Status.System) { + return true, nil + } + + // check for interfaces + // we can't use deep equal here because if we have a vf inside a pod is name will not be available for example + // we use the index for both lists + c := current.Status.DeepCopy().Interfaces + d := desiredNodeState.Status.DeepCopy().Interfaces + for idx := range d { + // check if it's a new device + if d[idx].PciAddress != c[idx].PciAddress { + return true, nil + } + // remove all the vfs + d[idx].VFs = nil + c[idx].VFs = nil + + if !reflect.DeepEqual(d[idx], c[idx]) { + return true, nil + } + } + + return false, nil +} + +func (dn *DaemonReconcile) getHostNetworkStatus(nodeState *sriovnetworkv1.SriovNetworkNodeState) error { + log.Log.WithName("GetHostNetworkStatus").Info("Getting host network status") + var iface []sriovnetworkv1.InterfaceExt + var bridges sriovnetworkv1.Bridges + var err error + + if vars.PlatformType == consts.VirtualOpenStack { + iface, err = dn.platformHelpers.DiscoverSriovDevicesVirtual() + if err != nil { + return err + } + } else { + iface, err = dn.HostHelpers.DiscoverSriovDevices(dn.HostHelpers) + if err != nil { + return err + } + if vars.ManageSoftwareBridges { + bridges, err = dn.HostHelpers.DiscoverBridges() + if err != nil { + return err + } + } + } + + nodeState.Status.Interfaces = iface + nodeState.Status.Bridges = bridges + nodeState.Status.System.RdmaMode, err = dn.HostHelpers.DiscoverRDMASubsystem() + return err +} + +func (dn *DaemonReconcile) recordStatusChangeEvent(ctx context.Context, oldStatus, newStatus, lastError string) { + if oldStatus != newStatus { + if oldStatus == "" { + oldStatus = Unknown + } + if newStatus == "" { + newStatus = Unknown + } + eventMsg := fmt.Sprintf("Status changed from: %s to: %s", oldStatus, newStatus) + if lastError != "" { + eventMsg = fmt.Sprintf("%s. Last Error: %s", eventMsg, lastError) + } + dn.eventRecorder.SendEvent(ctx, "SyncStatusChanged", eventMsg) + } +} diff --git a/pkg/daemon/writer.go b/pkg/daemon/writer.go deleted file mode 100644 index 42eeb2928..000000000 --- a/pkg/daemon/writer.go +++ /dev/null @@ -1,290 +0,0 @@ -package daemon - -import ( - "context" - "encoding/json" - "fmt" - "os" - "path/filepath" - "time" - - "github.com/pkg/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/util/retry" - "sigs.k8s.io/controller-runtime/pkg/log" - - sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" - snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" -) - -const ( - CheckpointFileName = "sno-initial-node-state.json" - Unknown = "Unknown" -) - -type NodeStateStatusWriter struct { - client snclientset.Interface - status sriovnetworkv1.SriovNetworkNodeStateStatus - OnHeartbeatFailure func() - platformHelper platforms.Interface - hostHelper helper.HostHelpersInterface - eventRecorder *EventRecorder -} - -// NewNodeStateStatusWriter Create a new NodeStateStatusWriter -func NewNodeStateStatusWriter(c snclientset.Interface, - f func(), er *EventRecorder, - hostHelper helper.HostHelpersInterface, - platformHelper platforms.Interface) *NodeStateStatusWriter { - return &NodeStateStatusWriter{ - client: c, - OnHeartbeatFailure: f, - eventRecorder: er, - hostHelper: hostHelper, - platformHelper: platformHelper, - } -} - -// RunOnce initial the interface status for both baremetal and virtual environments -func (w *NodeStateStatusWriter) RunOnce() error { - log.Log.V(0).Info("RunOnce()") - msg := Message{} - - if vars.PlatformType == consts.VirtualOpenStack { - ns, err := w.getCheckPointNodeState() - if err != nil { - return err - } - - if ns == nil { - err = w.platformHelper.CreateOpenstackDevicesInfo() - if err != nil { - return err - } - } else { - w.platformHelper.CreateOpenstackDevicesInfoFromNodeStatus(ns) - } - } - - log.Log.V(0).Info("RunOnce(): first poll for nic status") - if err := w.pollNicStatus(); err != nil { - log.Log.Error(err, "RunOnce(): first poll failed") - } - - ns, err := w.setNodeStateStatus(msg) - if err != nil { - log.Log.Error(err, "RunOnce(): first writing to node status failed") - } - return w.writeCheckpointFile(ns) -} - -// Run reads from the writer channel and sets the interface status. It will -// return if the stop channel is closed. Intended to be run via a goroutine. -func (w *NodeStateStatusWriter) Run(stop <-chan struct{}, refresh <-chan Message, syncCh chan<- struct{}) error { - log.Log.V(0).Info("Run(): start writer") - msg := Message{} - - for { - select { - case <-stop: - log.Log.V(0).Info("Run(): stop writer") - return nil - case msg = <-refresh: - log.Log.V(0).Info("Run(): refresh trigger") - if err := w.pollNicStatus(); err != nil { - continue - } - _, err := w.setNodeStateStatus(msg) - if err != nil { - log.Log.Error(err, "Run() refresh: writing to node status failed") - } - syncCh <- struct{}{} - case <-time.After(30 * time.Second): - log.Log.V(2).Info("Run(): period refresh") - if err := w.pollNicStatus(); err != nil { - continue - } - w.setNodeStateStatus(msg) - } - } -} - -func (w *NodeStateStatusWriter) pollNicStatus() error { - log.Log.V(2).Info("pollNicStatus()") - var iface []sriovnetworkv1.InterfaceExt - var bridges sriovnetworkv1.Bridges - var rdmaMode string - var err error - - if vars.PlatformType == consts.VirtualOpenStack { - iface, err = w.platformHelper.DiscoverSriovDevicesVirtual() - if err != nil { - return err - } - } else { - iface, err = w.hostHelper.DiscoverSriovDevices(w.hostHelper) - if err != nil { - return err - } - if vars.ManageSoftwareBridges { - bridges, err = w.hostHelper.DiscoverBridges() - if err != nil { - return err - } - } - } - - rdmaMode, err = w.hostHelper.DiscoverRDMASubsystem() - if err != nil { - return err - } - - w.status.Interfaces = iface - w.status.Bridges = bridges - w.status.System.RdmaMode = rdmaMode - - return nil -} - -func (w *NodeStateStatusWriter) updateNodeStateStatusRetry(f func(*sriovnetworkv1.SriovNetworkNodeState)) (*sriovnetworkv1.SriovNetworkNodeState, error) { - var nodeState *sriovnetworkv1.SriovNetworkNodeState - var oldStatus, newStatus, lastError string - - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - n, getErr := w.getNodeState() - if getErr != nil { - return getErr - } - oldStatus = n.Status.SyncStatus - - // Call the status modifier. - f(n) - - newStatus = n.Status.SyncStatus - lastError = n.Status.LastSyncError - - var err error - nodeState, err = w.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).UpdateStatus(context.Background(), n, metav1.UpdateOptions{}) - if err != nil { - log.Log.V(0).Error(err, "updateNodeStateStatusRetry(): fail to update the node status") - } - return err - }) - if err != nil { - // may be conflict if max retries were hit - return nil, fmt.Errorf("unable to update node %v: %v", nodeState, err) - } - - w.recordStatusChangeEvent(oldStatus, newStatus, lastError) - - return nodeState, nil -} - -func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1.SriovNetworkNodeState, error) { - nodeState, err := w.updateNodeStateStatusRetry(func(nodeState *sriovnetworkv1.SriovNetworkNodeState) { - nodeState.Status.Interfaces = w.status.Interfaces - nodeState.Status.Bridges = w.status.Bridges - nodeState.Status.System = w.status.System - if msg.lastSyncError != "" || msg.syncStatus == consts.SyncStatusSucceeded { - // clear lastSyncError when sync Succeeded - nodeState.Status.LastSyncError = msg.lastSyncError - } - nodeState.Status.SyncStatus = msg.syncStatus - - log.Log.V(0).Info("setNodeStateStatus(): status", - "sync-status", nodeState.Status.SyncStatus, - "last-sync-error", nodeState.Status.LastSyncError) - }) - if err != nil { - return nil, err - } - return nodeState, nil -} - -// recordStatusChangeEvent sends event in case oldStatus differs from newStatus -func (w *NodeStateStatusWriter) recordStatusChangeEvent(oldStatus, newStatus, lastError string) { - if oldStatus != newStatus { - if oldStatus == "" { - oldStatus = Unknown - } - if newStatus == "" { - newStatus = Unknown - } - eventMsg := fmt.Sprintf("Status changed from: %s to: %s", oldStatus, newStatus) - if lastError != "" { - eventMsg = fmt.Sprintf("%s. Last Error: %s", eventMsg, lastError) - } - w.eventRecorder.SendEvent("SyncStatusChanged", eventMsg) - } -} - -// getNodeState queries the kube apiserver to get the SriovNetworkNodeState CR -func (w *NodeStateStatusWriter) getNodeState() (*sriovnetworkv1.SriovNetworkNodeState, error) { - var lastErr error - var n *sriovnetworkv1.SriovNetworkNodeState - err := wait.PollImmediate(10*time.Second, 5*time.Minute, func() (bool, error) { - n, lastErr = w.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{}) - if lastErr == nil { - return true, nil - } - log.Log.Error(lastErr, "getNodeState(): Failed to fetch node state, close all connections and retry...", "name", vars.NodeName) - // Use the Get() also as an client-go keepalive indicator for the TCP connection. - w.OnHeartbeatFailure() - return false, nil - }) - if err != nil { - if err == wait.ErrWaitTimeout { - return nil, errors.Wrapf(lastErr, "Timed out trying to fetch node %s", vars.NodeName) - } - return nil, err - } - return n, nil -} - -func (w *NodeStateStatusWriter) writeCheckpointFile(ns *sriovnetworkv1.SriovNetworkNodeState) error { - configdir := filepath.Join(vars.Destdir, CheckpointFileName) - file, err := os.OpenFile(configdir, os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - return err - } - defer file.Close() - log.Log.Info("writeCheckpointFile(): try to decode the checkpoint file") - if err = json.NewDecoder(file).Decode(&sriovnetworkv1.InitialState); err != nil { - log.Log.V(2).Error(err, "writeCheckpointFile(): fail to decode, writing new file instead") - log.Log.Info("writeCheckpointFile(): write checkpoint file") - if err = file.Truncate(0); err != nil { - return err - } - if _, err = file.Seek(0, 0); err != nil { - return err - } - if err = json.NewEncoder(file).Encode(*ns); err != nil { - return err - } - sriovnetworkv1.InitialState = *ns - } - return nil -} - -func (w *NodeStateStatusWriter) getCheckPointNodeState() (*sriovnetworkv1.SriovNetworkNodeState, error) { - log.Log.Info("getCheckPointNodeState()") - configdir := filepath.Join(vars.Destdir, CheckpointFileName) - file, err := os.OpenFile(configdir, os.O_RDONLY, 0644) - if err != nil { - if os.IsNotExist(err) { - return nil, nil - } - return nil, err - } - defer file.Close() - if err = json.NewDecoder(file).Decode(&sriovnetworkv1.InitialState); err != nil { - return nil, err - } - - return &sriovnetworkv1.InitialState, nil -} diff --git a/pkg/host/internal/network/network.go b/pkg/host/internal/network/network.go index 3ac17cf8f..e7036f8c1 100644 --- a/pkg/host/internal/network/network.go +++ b/pkg/host/internal/network/network.go @@ -75,7 +75,7 @@ func (n *network) TryToGetVirtualInterfaceName(pciAddr string) string { func (n *network) TryGetInterfaceName(pciAddr string) string { names, err := n.dputilsLib.GetNetNames(pciAddr) if err != nil || len(names) < 1 { - log.Log.Error(err, "TryGetInterfaceName(): failed to get interface name", "pciAddress", pciAddr) + log.Log.V(2).Info("TryGetInterfaceName(): failed to get interface name", "err", err, "pciAddress", pciAddr) return "" } netDevName := names[0] diff --git a/pkg/host/store/store.go b/pkg/host/store/store.go index 67c0b17e3..7e592c76d 100644 --- a/pkg/host/store/store.go +++ b/pkg/host/store/store.go @@ -174,7 +174,7 @@ func (s *manager) WriteCheckpointFile(ns *sriovnetworkv1.SriovNetworkNodeState) defer file.Close() log.Log.Info("WriteCheckpointFile(): try to decode the checkpoint file") if err = json.NewDecoder(file).Decode(&sriovnetworkv1.InitialState); err != nil { - log.Log.V(2).Error(err, "WriteCheckpointFile(): fail to decode, writing new file instead") + log.Log.Error(err, "WriteCheckpointFile(): fail to decode, writing new file instead") log.Log.Info("WriteCheckpointFile(): write checkpoint file") if err = file.Truncate(0); err != nil { return err diff --git a/pkg/log/log.go b/pkg/log/log.go index 1e76facc5..5ecc16893 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -60,6 +60,10 @@ func SetLogLevel(operatorLevel int) { } } +func GetLogLevel() int { + return zapToOperatorLevel(Options.Level.(zzap.AtomicLevel).Level()) +} + func zapToOperatorLevel(zapLevel zapcore.Level) int { return int(zapLevel) * -1 } diff --git a/pkg/plugins/mellanox/mellanox_plugin.go b/pkg/plugins/mellanox/mellanox_plugin.go index 10b0152bb..c89e163db 100644 --- a/pkg/plugins/mellanox/mellanox_plugin.go +++ b/pkg/plugins/mellanox/mellanox_plugin.go @@ -212,7 +212,7 @@ func (p *MellanoxPlugin) Apply() error { if err := p.helpers.MlxConfigFW(attributesToChange); err != nil { return err } - if vars.MlxPluginFwReset { + if vars.FeatureGate.IsEnabled(consts.MellanoxFirmwareResetFeatureGate) { return p.helpers.MlxResetFW(pciAddressesToReset) } return nil diff --git a/pkg/systemd/systemd.go b/pkg/systemd/systemd.go index 704e72c0f..b89312f3f 100644 --- a/pkg/systemd/systemd.go +++ b/pkg/systemd/systemd.go @@ -180,31 +180,31 @@ func WriteSriovResult(result *SriovResult) error { return nil } -func ReadSriovResult() (*SriovResult, error) { +func ReadSriovResult() (*SriovResult, bool, error) { _, err := os.Stat(utils.GetHostExtensionPath(SriovSystemdResultPath)) if err != nil { if os.IsNotExist(err) { log.Log.V(2).Info("ReadSriovResult(): file does not exist, return empty result") - return &SriovResult{}, nil + return nil, false, nil } else { log.Log.Error(err, "ReadSriovResult(): failed to check sriov result file", "path", utils.GetHostExtensionPath(SriovSystemdResultPath)) - return nil, err + return nil, false, err } } rawConfig, err := os.ReadFile(utils.GetHostExtensionPath(SriovSystemdResultPath)) if err != nil { log.Log.Error(err, "ReadSriovResult(): failed to read sriov result file", "path", utils.GetHostExtensionPath(SriovSystemdResultPath)) - return nil, err + return nil, false, err } result := &SriovResult{} err = yaml.Unmarshal(rawConfig, &result) if err != nil { log.Log.Error(err, "ReadSriovResult(): failed to unmarshal sriov result file", "path", utils.GetHostExtensionPath(SriovSystemdResultPath)) - return nil, err + return nil, false, err } - return result, err + return result, true, err } func RemoveSriovResult() error { diff --git a/pkg/utils/cluster.go b/pkg/utils/cluster.go index 5f9aa7065..ad7fee1a2 100644 --- a/pkg/utils/cluster.go +++ b/pkg/utils/cluster.go @@ -127,27 +127,26 @@ func ObjectHasAnnotation(obj metav1.Object, annoKey string, value string) bool { // AnnotateObject adds annotation to a kubernetes object func AnnotateObject(ctx context.Context, obj client.Object, key, value string, c client.Client) error { - newObj := obj.DeepCopyObject().(client.Object) - if newObj.GetAnnotations() == nil { - newObj.SetAnnotations(map[string]string{}) + original := obj.DeepCopyObject().(client.Object) + if obj.GetAnnotations() == nil { + obj.SetAnnotations(map[string]string{}) } - if newObj.GetAnnotations()[key] != value { + if obj.GetAnnotations()[key] != value { log.Log.V(2).Info("AnnotateObject(): Annotate object", "objectName", obj.GetName(), "objectKind", obj.GetObjectKind(), "annotationKey", key, "annotationValue", value) - newObj.GetAnnotations()[key] = value - patch := client.MergeFrom(obj) + obj.GetAnnotations()[key] = value + patch := client.MergeFrom(original) err := c.Patch(ctx, - newObj, patch) + obj, patch) if err != nil { log.Log.Error(err, "annotateObject(): Failed to patch object") return err } } - return nil } diff --git a/pkg/vars/vars.go b/pkg/vars/vars.go index fc7108ed8..44106575c 100644 --- a/pkg/vars/vars.go +++ b/pkg/vars/vars.go @@ -8,6 +8,7 @@ import ( "k8s.io/client-go/rest" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate" ) var ( @@ -54,9 +55,6 @@ var ( // ManageSoftwareBridges global variable which reflects state of manageSoftwareBridges feature ManageSoftwareBridges = false - // MlxPluginFwReset global variable enables mstfwreset before rebooting a node on VF changes - MlxPluginFwReset = false - // FilesystemRoot used by test to mock interactions with filesystem FilesystemRoot = "" @@ -75,6 +73,12 @@ var ( // DisableablePlugins contains which plugins can be disabled in sriov config daemon DisableablePlugins = map[string]struct{}{"mellanox": {}} + + // DisableDrain controls if the daemon will drain the node before configuration + DisableDrain = false + + // FeatureGates interface to interact with feature gates + FeatureGate featuregate.FeatureGate ) func init() { @@ -95,4 +99,6 @@ func init() { } ResourcePrefix = os.Getenv("RESOURCE_PREFIX") + + FeatureGate = featuregate.New() } diff --git a/test/util/k8sreporter/reporter.go b/test/util/k8sreporter/reporter.go index 13baac0aa..ab2fde5fe 100644 --- a/test/util/k8sreporter/reporter.go +++ b/test/util/k8sreporter/reporter.go @@ -6,13 +6,13 @@ import ( "strings" kniK8sReporter "github.com/openshift-kni/k8sreporter" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/runtime" + monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + sriovv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" "github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/namespaces" - - monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - rbacv1 "k8s.io/api/rbac/v1" ) func New(reportPath string) (*kniK8sReporter.KubernetesReporter, error) { @@ -63,6 +63,7 @@ func New(reportPath string) (*kniK8sReporter.KubernetesReporter, error) { {Cr: &sriovv1.SriovNetworkNodePolicyList{}}, {Cr: &sriovv1.SriovNetworkList{}}, {Cr: &sriovv1.SriovOperatorConfigList{}}, + {Cr: &sriovv1.SriovNetworkPoolConfigList{}}, {Cr: &monitoringv1.ServiceMonitorList{}, Namespace: &operatorNamespace}, {Cr: &monitoringv1.PrometheusRuleList{}, Namespace: &operatorNamespace}, {Cr: &rbacv1.RoleList{}, Namespace: &operatorNamespace}, diff --git a/test/util/namespaces/namespaces.go b/test/util/namespaces/namespaces.go index 5ed106398..ac6cfdcc0 100644 --- a/test/util/namespaces/namespaces.go +++ b/test/util/namespaces/namespaces.go @@ -129,6 +129,25 @@ func CleanNetworks(operatorNamespace string, cs *testclient.ClientSet) error { return waitForSriovNetworkDeletion(operatorNamespace, cs, 15*time.Second) } +func CleanPools(operatorNamespace string, cs *testclient.ClientSet) error { + pools := sriovv1.SriovNetworkPoolConfigList{} + err := cs.List(context.Background(), + &pools, + runtimeclient.InNamespace(operatorNamespace)) + if err != nil { + return err + } + for _, p := range pools.Items { + if strings.HasPrefix(p.Name, "test-") { + err := cs.Delete(context.Background(), &p) + if err != nil { + return fmt.Errorf("failed to delete networkPoolConfig %v", err) + } + } + } + return err +} + func waitForSriovNetworkDeletion(operatorNamespace string, cs *testclient.ClientSet, timeout time.Duration) error { return wait.PollImmediate(time.Second, timeout, func() (bool, error) { networks := sriovv1.SriovNetworkList{} @@ -164,7 +183,8 @@ func Clean(operatorNamespace, namespace string, cs *testclient.ClientSet, discov if err != nil { return err } - return nil + + return CleanPools(operatorNamespace, cs) } func AddLabel(cs corev1client.NamespacesGetter, ctx context.Context, namespaceName, key, value string) error {