Skip to content

Commit

Permalink
Implement migration controller
Browse files Browse the repository at this point in the history
This commit introduced drain state migrtation from node annotation
to SriovNetworkNodeState object.
  • Loading branch information
e0ne committed Aug 16, 2023
1 parent 6131bda commit d75c669
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 17 deletions.
36 changes: 24 additions & 12 deletions controllers/migration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,30 @@ package controllers

import (
"context"
"fmt"
v1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
)

type MigrationReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
ClientSet snclientset.Interface
}

//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;create;update;patch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand All @@ -35,6 +40,9 @@ func (mr *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err := mr.Get(ctx, types.NamespacedName{
Name: req.Name}, node)
if err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
reqLogger.Error(err, "Error occurred on GET SriovOperatorConfig request from API server.")
return reconcile.Result{}, err
}
Expand All @@ -47,15 +55,19 @@ func (mr *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
reqLogger.Error(err, "Error occurred on GET SriovNetworkNodeState request from API server.")
return reconcile.Result{}, err
}
patch := []byte(fmt.Sprintf(`{"status":{"drainStatus":"%s"}}`, anno))
err = mr.Client.Patch(context.TODO(), nodeState, client.RawPatch(types.StrategicMergePatchType, patch))
if err != nil {
reqLogger.Error(err, "Error occurred on SriovNetworkNodeState update.")
return reconcile.Result{}, err
}
nodeState.Status.DrainStatus = anno
mr.ClientSet.SriovnetworkV1().SriovNetworkNodeStates(namespace).UpdateStatus(context.TODO(), nodeState, metav1.UpdateOptions{})
delete(node.Annotations, constants.NodeDrainAnnotation)
mr.Update(ctx, node)
//patch := []byte(fmt.Sprintf(`{"status":{"drainStatus":"%s"}}`, anno))
//err = mr.Client.Patch(context.TODO(), nodeState, client.RawPatch(types.StrategicMergePatchType, patch))
//if err != nil {
// reqLogger.Error(err, "Error occurred on SriovNetworkNodeState update.")
// return reconcile.Result{}, err
//}

}

return reconcile.Result{}, nil
}

Expand Down
23 changes: 20 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import (
"context"
"flag"
"fmt"
"github.com/golang/glog"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kubectl/pkg/drain"
"os"
"time"

"github.com/golang/glog"
netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
openshiftconfigv1 "github.com/openshift/api/config/v1"
mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
Expand All @@ -49,6 +52,7 @@ import (

sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
"github.com/k8snetworkplumbingwg/sriov-network-operator/controllers"
snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/leaderelection"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
Expand Down Expand Up @@ -136,6 +140,18 @@ func main() {
os.Exit(1)
}

var config *rest.Config
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
} else {
// creates the in-cluster config
config, err = rest.InClusterConfig()
}

setupLog.Info("###config: ", "config", config)
snclient := snclientset.NewForConfigOrDie(config)

if err = (&controllers.SriovNetworkReconciler{
Client: mgrGlobal.GetClient(),
Scheme: mgrGlobal.GetScheme(),
Expand Down Expand Up @@ -199,8 +215,9 @@ func main() {
os.Exit(1)
}
if err = (&controllers.MigrationReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ClientSet: snclient,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MigrationReconciler")
os.Exit(1)
Expand Down
11 changes: 10 additions & 1 deletion pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
)

type Message struct {
drainStatus string
syncStatus string
lastSyncError string
}
Expand Down Expand Up @@ -301,6 +302,7 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
glog.Warningf("Got an error: %v", err)
if more {
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: syncStatusFailed,
lastSyncError: err.Error(),
}
Expand Down Expand Up @@ -357,8 +359,9 @@ func (dn *Daemon) processNextWorkItem() bool {

err := dn.nodeStateSyncHandler()
if err != nil {
// Ereport error message, and put the item back to work queue for retry.
// Report error message, and put the item back to work queue for retry.
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: syncStatusFailed,
lastSyncError: err.Error(),
}
Expand Down Expand Up @@ -455,6 +458,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {

// add the error but don't requeue
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: syncStatusFailed,
lastSyncError: sriovResult.LastSyncError,
}
Expand All @@ -467,6 +471,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
if latestState.Status.LastSyncError != "" ||
latestState.Status.SyncStatus != syncStatusSucceeded {
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: syncStatusSucceeded,
lastSyncError: "",
}
Expand All @@ -481,6 +486,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
glog.V(0).Infof("nodeStateSyncHandler(): Name: %s, Interface policy spec not yet set by controller", latestState.Name)
if latestState.Status.SyncStatus != "Succeeded" {
dn.refreshCh <- Message{
drainStatus: latestState.Status.DrainStatus,
syncStatus: "Succeeded",
lastSyncError: "",
}
Expand All @@ -491,6 +497,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
}

dn.refreshCh <- Message{
drainStatus: latestState.Status.DrainStatus,
syncStatus: syncStatusInProgress,
lastSyncError: "",
}
Expand Down Expand Up @@ -649,11 +656,13 @@ func (dn *Daemon) nodeStateSyncHandler() error {
dn.nodeState = latestState.DeepCopy()
if dn.useSystemdService {
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: sriovResult.SyncStatus,
lastSyncError: sriovResult.LastSyncError,
}
} else {
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: syncStatusSucceeded,
lastSyncError: "",
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/daemon/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,14 @@ func (w *NodeStateStatusWriter) updateNodeStateStatusRetry(f func(*sriovnetworkv
func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1.SriovNetworkNodeState, error) {
nodeState, err := w.updateNodeStateStatusRetry(func(nodeState *sriovnetworkv1.SriovNetworkNodeState) {
nodeState.Status.Interfaces = w.status.Interfaces
nodeState.Status.DrainStatus = msg.drainStatus
if msg.lastSyncError != "" || msg.syncStatus == syncStatusSucceeded {
// clear lastSyncError when sync Succeeded
nodeState.Status.LastSyncError = msg.lastSyncError
}
nodeState.Status.SyncStatus = msg.syncStatus

glog.V(0).Infof("setNodeStateStatus(): syncStatus: %s, lastSyncError: %s", nodeState.Status.SyncStatus, nodeState.Status.LastSyncError)
glog.V(0).Infof("setNodeStateStatus(): drainStatus: %s, syncStatus: %s, lastSyncError: %s", nodeState.Status.DrainStatus, nodeState.Status.SyncStatus, nodeState.Status.LastSyncError)
})
if err != nil {
return nil, err
Expand Down

0 comments on commit d75c669

Please sign in to comment.