Skip to content

Commit

Permalink
set MaxConcurrentReconciles for VolumeReplication
Browse files Browse the repository at this point in the history
Added option to set MaxConcurrentReconciles
for VolumeReplication which was pending and for
all other controllers this is set in csi-addons#203

Signed-off-by: Madhu Rajanna <[email protected]>
  • Loading branch information
Madhu-1 committed Aug 24, 2022
1 parent 2fc35bb commit c76655a
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 12 deletions.
3 changes: 1 addition & 2 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ func main() {
Scheme: mgr.GetScheme(),
Connpool: connPool,
Timeout: defaultTimeout,
Log: ctrl.Log.WithName("controllers").WithName("VolumeReplication"),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, ctrlOptions); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "VolumeReplication")
os.Exit(1)
}
Expand Down
6 changes: 3 additions & 3 deletions controllers/replication.storage/pvc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controllers

import (
"context"
"testing"

replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1"
Expand All @@ -27,7 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log"
)

const (
Expand Down Expand Up @@ -102,7 +103,6 @@ func createFakeVolumeReplicationReconciler(t *testing.T, obj ...runtime.Object)
return VolumeReplicationReconciler{
Client: client,
Scheme: scheme,
Log: logf.Log.WithName("controller_volumereplication_test"),
}
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func TestGetVolumeHandle(t *testing.T) {
}

reconciler := createFakeVolumeReplicationReconciler(t, testPV, testPVC, volumeReplication)
resultPVC, resultPV, err := reconciler.getPVCDataSource(reconciler.Log, namespacedName)
resultPVC, resultPV, err := reconciler.getPVCDataSource(log.FromContext(context.TODO()), namespacedName)
if tc.errorExpected {
assert.Error(t, err)
} else {
Expand Down
14 changes: 8 additions & 6 deletions controllers/replication.storage/volumereplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand All @@ -59,7 +61,6 @@ var (
// VolumeReplicationReconciler reconciles a VolumeReplication object.
type VolumeReplicationReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
// ConnectionPool consists of map of Connection objects
Connpool *conn.ConnectionPool
Expand All @@ -81,7 +82,7 @@ type VolumeReplicationReconciler struct {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues("Request.Name", req.Name, "Request.Namespace", req.Namespace)
logger := log.FromContext(ctx, "Request.Name", req.Name, "Request.Namespace", req.Namespace)

// Fetch VolumeReplication instance
instance := &replicationv1alpha1.VolumeReplication{}
Expand Down Expand Up @@ -385,10 +386,9 @@ func (r *VolumeReplicationReconciler) updateReplicationStatus(
}

// SetupWithManager sets up the controller with the Manager.
func (r *VolumeReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *VolumeReplicationReconciler) SetupWithManager(mgr ctrl.Manager, ctrlOptions controller.Options) error {
err := r.waitForCrds()
if err != nil {
r.Log.Error(err, "failed to wait for crds")

return err
}
Expand All @@ -397,11 +397,13 @@ func (r *VolumeReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
For(&replicationv1alpha1.VolumeReplication{}).
WithEventFilter(pred).Complete(r)
WithEventFilter(pred).
WithOptions(ctrlOptions).
Complete(r)
}

func (r *VolumeReplicationReconciler) waitForCrds() error {
logger := r.Log.WithName("checkingDependencies")
logger := log.FromContext(context.TODO(), "Name", "checkingDependencies")

err := r.waitForVolumeReplicationResource(logger, volumeReplicationClass)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controllers

import (
"context"
"testing"

replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1"
Expand All @@ -25,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/log"
)

var mockVolumeReplicationClassObj = &replicationv1alpha1.VolumeReplicationClass{
Expand Down Expand Up @@ -61,7 +63,7 @@ func TestGetVolumeReplicaClass(t *testing.T) {
}

reconciler := createFakeVolumeReplicationReconciler(t, objects...)
vrcObj, err := reconciler.getVolumeReplicationClass(reconciler.Log, mockVolumeReplicationClassObj.Name)
vrcObj, err := reconciler.getVolumeReplicationClass(log.FromContext(context.TODO()), mockVolumeReplicationClassObj.Name)

if tc.errorExpected {
assert.Error(t, err)
Expand Down

0 comments on commit c76655a

Please sign in to comment.