diff --git a/controllers/kafkacluster_controller.go b/controllers/kafkacluster_controller.go index f59e16024..4bf5b3405 100644 --- a/controllers/kafkacluster_controller.go +++ b/controllers/kafkacluster_controller.go @@ -60,7 +60,6 @@ var clusterUsersFinalizer = "users.kafkaclusters.kafka.banzaicloud.io" type KafkaClusterReconciler struct { client.Client DirectClient client.Reader - Log logr.Logger Namespaces []string KafkaClientProvider kafkaclient.Provider } @@ -85,7 +84,7 @@ type KafkaClusterReconciler struct { // +kubebuilder:rbac:groups=networking.istio.io,resources=*,verbs=* func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("Request.Namespace", request.NamespacedName, "Request.Name", request.Name) + log := logr.FromContextOrDiscard(ctx) log.Info("Reconciling KafkaCluster") @@ -104,7 +103,7 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req // Check if marked for deletion and run finalizers if k8sutil.IsMarkedForDeletion(instance.ObjectMeta) { - return r.checkFinalizers(ctx, log, instance) + return r.checkFinalizers(ctx, instance) } if instance.Status.State != v1beta1.KafkaClusterRollingUpgrading { @@ -192,7 +191,8 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req return reconciled() } -func (r *KafkaClusterReconciler) checkFinalizers(ctx context.Context, log logr.Logger, cluster *v1beta1.KafkaCluster) (ctrl.Result, error) { +func (r *KafkaClusterReconciler) checkFinalizers(ctx context.Context, cluster *v1beta1.KafkaCluster) (ctrl.Result, error) { + log := logr.FromContextOrDiscard(ctx) log.Info("KafkaCluster is marked for deletion, checking for children") // If the main finalizer is gone then we've already finished up @@ -287,7 +287,7 @@ func (r *KafkaClusterReconciler) checkFinalizers(ctx context.Context, log logr.L // Do any necessary PKI cleanup - a PKI backend should make sure any // user finalizations are done before it does its final cleanup log.Info("Tearing down any PKI resources for the kafkacluster") - if err = pki.GetPKIManager(r.Client, cluster, v1beta1.PKIBackendProvided, r.Log).FinalizePKI(ctx, log); err != nil { + if err = pki.GetPKIManager(r.Client, cluster, v1beta1.PKIBackendProvided).FinalizePKI(ctx); err != nil { switch err.(type) { case errorfactory.ResourceNotReady: log.Info("The PKI is not ready to be torn down") @@ -349,7 +349,8 @@ func (r *KafkaClusterReconciler) updateAndFetchLatest(ctx context.Context, clust } // SetupKafkaClusterWithManager registers kafka cluster controller to the manager -func SetupKafkaClusterWithManager(mgr ctrl.Manager, log logr.Logger) *ctrl.Builder { +func SetupKafkaClusterWithManager(mgr ctrl.Manager) *ctrl.Builder { + log := mgr.GetLogger() builder := ctrl.NewControllerManagedBy(mgr). For(&v1beta1.KafkaCluster{}).Named("KafkaCluster") diff --git a/controllers/kafkatopic_controller.go b/controllers/kafkatopic_controller.go index d52850195..928c37a88 100644 --- a/controllers/kafkatopic_controller.go +++ b/controllers/kafkatopic_controller.go @@ -92,7 +92,6 @@ type KafkaTopicReconciler struct { // that reads objects from the cache and writes to the apiserver Client client.Client Scheme *runtime.Scheme - Log logr.Logger } // +kubebuilder:rbac:groups=kafka.banzaicloud.io,resources=kafkatopics,verbs=get;list;watch;create;update;patch;delete;deletecollection @@ -100,7 +99,7 @@ type KafkaTopicReconciler struct { // Reconcile reconciles the kafka topic func (r *KafkaTopicReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - reqLogger := r.Log.WithValues("kafkatopic", request.NamespacedName, "Request.Name", request.Name) + reqLogger := logr.FromContextOrDiscard(ctx) reqLogger.Info("Reconciling KafkaTopic") var err error @@ -141,7 +140,7 @@ func (r *KafkaTopicReconciler) Reconcile(ctx context.Context, request reconcile. // Check if marked for deletion and if so run finalizers if k8sutil.IsMarkedForDeletion(instance.ObjectMeta) { - return r.checkFinalizers(ctx, reqLogger, broker, instance) + return r.checkFinalizers(ctx, broker, instance) } // Check if the topic already exists @@ -226,7 +225,8 @@ func (r *KafkaTopicReconciler) updateAndFetchLatest(ctx context.Context, topic * return topic, nil } -func (r *KafkaTopicReconciler) checkFinalizers(ctx context.Context, reqLogger logr.Logger, broker kafkaclient.KafkaClient, topic *v1alpha1.KafkaTopic) (reconcile.Result, error) { +func (r *KafkaTopicReconciler) checkFinalizers(ctx context.Context, broker kafkaclient.KafkaClient, topic *v1alpha1.KafkaTopic) (reconcile.Result, error) { + reqLogger := logr.FromContextOrDiscard(ctx) reqLogger.Info("Kafka topic is marked for deletion") var err error if util.StringSliceContains(topic.GetFinalizers(), topicFinalizer) { diff --git a/controllers/kafkauser_controller.go b/controllers/kafkauser_controller.go index 377ac34b7..e41516bcc 100644 --- a/controllers/kafkauser_controller.go +++ b/controllers/kafkauser_controller.go @@ -56,7 +56,8 @@ import ( var userFinalizer = "finalizer.kafkausers.kafka.banzaicloud.io" // SetupKafkaUserWithManager registers KafkaUser controller to the manager -func SetupKafkaUserWithManager(mgr ctrl.Manager, certSigningEnabled bool, certManagerNamespace bool, log logr.Logger) *ctrl.Builder { +func SetupKafkaUserWithManager(mgr ctrl.Manager, certSigningEnabled bool, certManagerNamespace bool) *ctrl.Builder { + log := mgr.GetLogger() builder := ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.KafkaUser{}).Named("KafkaUser") if certSigningEnabled { @@ -154,7 +155,6 @@ type KafkaUserReconciler struct { // that reads objects from the cache and writes to the apiserver Client client.Client Scheme *runtime.Scheme - Log logr.Logger } // +kubebuilder:rbac:groups=kafka.banzaicloud.io,resources=kafkausers,verbs=get;list;watch;create;update;patch;delete;deletecollection @@ -167,7 +167,7 @@ type KafkaUserReconciler struct { // Reconcile reads that state of the cluster for a KafkaUser object and makes changes based on the state read // and what is in the KafkaUser.Spec func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - reqLogger := r.Log.WithValues("kafkauser", request.NamespacedName, "Request.Name", request.Name) + reqLogger := logr.FromContextOrDiscard(ctx) reqLogger.Info("Reconciling KafkaUser") var err error @@ -212,7 +212,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R backend = v1beta1.PKIBackendProvided } - pkiManager := pki.GetPKIManager(r.Client, cluster, backend, r.Log) + pkiManager := pki.GetPKIManager(r.Client, cluster, backend) user, err := pkiManager.ReconcileUserCertificate(ctx, instance, r.Scheme, cluster.Spec.GetKubernetesClusterDomain()) if err != nil { @@ -257,7 +257,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R // check if marked for deletion and remove kafka ACLs if k8sutil.IsMarkedForDeletion(instance.ObjectMeta) { - return r.checkFinalizers(ctx, reqLogger, cluster, instance, kafkaUser) + return r.checkFinalizers(ctx, cluster, instance, kafkaUser) } // ensure a kafkaCluster label @@ -324,7 +324,8 @@ func (r *KafkaUserReconciler) updateAndFetchLatest(ctx context.Context, user *v1 return user, nil } -func (r *KafkaUserReconciler) checkFinalizers(ctx context.Context, reqLogger logr.Logger, cluster *v1beta1.KafkaCluster, instance *v1alpha1.KafkaUser, user string) (reconcile.Result, error) { +func (r *KafkaUserReconciler) checkFinalizers(ctx context.Context, cluster *v1beta1.KafkaCluster, instance *v1alpha1.KafkaUser, user string) (reconcile.Result, error) { + reqLogger := logr.FromContextOrDiscard(ctx) // run finalizers var err error if util.StringSliceContains(instance.GetFinalizers(), userFinalizer) { diff --git a/controllers/tests/clusterregistry/suite_test.go b/controllers/tests/clusterregistry/suite_test.go index 4ef93ba17..68ee810dd 100644 --- a/controllers/tests/clusterregistry/suite_test.go +++ b/controllers/tests/clusterregistry/suite_test.go @@ -133,7 +133,7 @@ var _ = BeforeSuite(func() { Expect(mgr).ToNot(BeNil()) kafkaClusterReconciler = NewTestReconciler() - err = controllers.SetupKafkaClusterWithManager(mgr, logf.Log).Named("KafkaCluster").Complete(kafkaClusterReconciler) + err = controllers.SetupKafkaClusterWithManager(mgr).Named("KafkaCluster").Complete(kafkaClusterReconciler) Expect(err).NotTo(HaveOccurred()) kafkaTopicReconciler = NewTestReconciler() @@ -142,7 +142,7 @@ var _ = BeforeSuite(func() { // Create a new kafka user reconciler kafkaUserReconciler = NewTestReconciler() - err = controllers.SetupKafkaUserWithManager(mgr, true, true, logf.Log).Named("KafkaUser").Complete(kafkaUserReconciler) + err = controllers.SetupKafkaUserWithManager(mgr, true, true).Named("KafkaUser").Complete(kafkaUserReconciler) Expect(err).NotTo(HaveOccurred()) cruiseControlTaskReconciler = NewTestReconciler() diff --git a/controllers/tests/suite_test.go b/controllers/tests/suite_test.go index 8c009a031..cabbf2020 100644 --- a/controllers/tests/suite_test.go +++ b/controllers/tests/suite_test.go @@ -146,17 +146,15 @@ var _ = BeforeSuite(func() { kafkaClusterReconciler := controllers.KafkaClusterReconciler{ Client: mgr.GetClient(), DirectClient: mgr.GetAPIReader(), - Log: ctrl.Log.WithName("controllers").WithName("KafkaCluster"), KafkaClientProvider: kafkaclient.NewMockProvider(), } - err = controllers.SetupKafkaClusterWithManager(mgr, kafkaClusterReconciler.Log).Complete(&kafkaClusterReconciler) + err = controllers.SetupKafkaClusterWithManager(mgr).Complete(&kafkaClusterReconciler) Expect(err).NotTo(HaveOccurred()) kafkaTopicReconciler := &controllers.KafkaTopicReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), - Log: ctrl.Log.WithName("controllers").WithName("KafkaTopic"), } err = controllers.SetupKafkaTopicWithManager(mgr, 10).Complete(kafkaTopicReconciler) @@ -166,10 +164,9 @@ var _ = BeforeSuite(func() { kafkaUserReconciler := controllers.KafkaUserReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), - Log: ctrl.Log.WithName("controllers").WithName("KafkaUser"), } - err = controllers.SetupKafkaUserWithManager(mgr, true, true, kafkaUserReconciler.Log).Complete(&kafkaUserReconciler) + err = controllers.SetupKafkaUserWithManager(mgr, true, true).Complete(&kafkaUserReconciler) Expect(err).NotTo(HaveOccurred()) kafkaClusterCCReconciler := controllers.CruiseControlTaskReconciler{ diff --git a/main.go b/main.go index a194735e2..46a47a39f 100644 --- a/main.go +++ b/main.go @@ -152,11 +152,10 @@ func main() { Client: mgr.GetClient(), DirectClient: mgr.GetAPIReader(), Namespaces: namespaceList, - Log: ctrl.Log.WithName("controllers").WithName("KafkaCluster"), KafkaClientProvider: kafkaclient.NewDefaultProvider(), } - if err = controllers.SetupKafkaClusterWithManager(mgr, kafkaClusterReconciler.Log).Complete(kafkaClusterReconciler); err != nil { + if err = controllers.SetupKafkaClusterWithManager(mgr).Complete(kafkaClusterReconciler); err != nil { setupLog.Error(err, "unable to create controller", "controller", "KafkaCluster") os.Exit(1) } @@ -164,7 +163,6 @@ func main() { kafkaTopicReconciler := &controllers.KafkaTopicReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), - Log: ctrl.Log.WithName("controllers").WithName("KafkaTopic"), } if err = controllers.SetupKafkaTopicWithManager(mgr, maxKafkaTopicConcurrentReconciles).Complete(kafkaTopicReconciler); err != nil { @@ -176,10 +174,9 @@ func main() { kafkaUserReconciler := &controllers.KafkaUserReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), - Log: ctrl.Log.WithName("controllers").WithName("KafkaUser"), } - if err = controllers.SetupKafkaUserWithManager(mgr, !certSigningDisabled, certManagerEnabled, kafkaUserReconciler.Log).Complete(kafkaUserReconciler); err != nil { + if err = controllers.SetupKafkaUserWithManager(mgr, !certSigningDisabled, certManagerEnabled).Complete(kafkaUserReconciler); err != nil { setupLog.Error(err, "unable to create controller", "controller", "KafkaUser") os.Exit(1) } diff --git a/pkg/k8sutil/status.go b/pkg/k8sutil/status.go index 1fef22be3..a7abf4910 100644 --- a/pkg/k8sutil/status.go +++ b/pkg/k8sutil/status.go @@ -261,8 +261,9 @@ func UpdateRollingUpgradeState(c client.Client, cluster *banzaicloudv1beta1.Kafk return nil } -func UpdateListenerStatuses(ctx context.Context, c client.Client, cluster *banzaicloudv1beta1.KafkaCluster, logger logr.Logger, - intListenerStatuses, extListenerStatuses map[string]banzaicloudv1beta1.ListenerStatusList) error { +func UpdateListenerStatuses(ctx context.Context, c client.Client, cluster *banzaicloudv1beta1.KafkaCluster, intListenerStatuses, extListenerStatuses map[string]banzaicloudv1beta1.ListenerStatusList) error { + logger := logr.FromContextOrDiscard(ctx) + typeMeta := cluster.TypeMeta cluster.Status.ListenerStatuses = banzaicloudv1beta1.ListenerStatuses{ diff --git a/pkg/kafkaclient/config.go b/pkg/kafkaclient/config.go index 3297d4371..0088d485a 100644 --- a/pkg/kafkaclient/config.go +++ b/pkg/kafkaclient/config.go @@ -49,7 +49,7 @@ func ClusterConfig(client client.Client, cluster *v1beta1.KafkaCluster) (*KafkaC if cluster.Spec.GetClientSSLCertSecretName() != "" { tlsConfig, err = util.GetClientTLSConfig(client, types.NamespacedName{Name: cluster.Spec.GetClientSSLCertSecretName(), Namespace: cluster.Namespace}) } else if cluster.Spec.ListenersConfig.SSLSecrets != nil { - tlsConfig, err = pki.GetPKIManager(client, cluster, v1beta1.PKIBackendProvided, log).GetControllerTLSConfig() + tlsConfig, err = pki.GetPKIManager(client, cluster, v1beta1.PKIBackendProvided).GetControllerTLSConfig() } else { err = errors.New("either 'clientSSLCertSecret' or 'sslSecrets' must be specified as internal listener used for inner communication uses SSL") } diff --git a/pkg/pki/certmanagerpki/certmanager_pki.go b/pkg/pki/certmanagerpki/certmanager_pki.go index b56f7c14c..2a05405d5 100644 --- a/pkg/pki/certmanagerpki/certmanager_pki.go +++ b/pkg/pki/certmanagerpki/certmanager_pki.go @@ -35,7 +35,8 @@ import ( pkicommon "github.com/banzaicloud/koperator/pkg/util/pki" ) -func (c *certManager) FinalizePKI(ctx context.Context, logger logr.Logger) error { +func (c *certManager) FinalizePKI(ctx context.Context) error { + logger := logr.FromContextOrDiscard(ctx) logger.Info("Removing cert-manager certificates and secrets") // Safety check that we are actually doing something @@ -87,7 +88,8 @@ func (c *certManager) FinalizePKI(ctx context.Context, logger logr.Logger) error return nil } -func (c *certManager) ReconcilePKI(ctx context.Context, logger logr.Logger, extListenerStatuses map[string]v1beta1.ListenerStatusList) (err error) { +func (c *certManager) ReconcilePKI(ctx context.Context, extListenerStatuses map[string]v1beta1.ListenerStatusList) (err error) { + logger := logr.FromContextOrDiscard(ctx) logger.Info("Reconciling cert-manager PKI") resources, err := c.kafkapki(ctx, extListenerStatuses) @@ -96,7 +98,7 @@ func (c *certManager) ReconcilePKI(ctx context.Context, logger logr.Logger, extL } for _, o := range resources { - if err := reconcile(ctx, logger, c.client, o); err != nil { + if err := reconcile(ctx, c.client, o); err != nil { return err } } diff --git a/pkg/pki/certmanagerpki/certmanager_pki_test.go b/pkg/pki/certmanagerpki/certmanager_pki_test.go index 7c941b8e2..67a28ea1e 100644 --- a/pkg/pki/certmanagerpki/certmanager_pki_test.go +++ b/pkg/pki/certmanagerpki/certmanager_pki_test.go @@ -21,7 +21,6 @@ import ( "testing" corev1 "k8s.io/api/core/v1" - ctrl "sigs.k8s.io/controller-runtime" "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" @@ -30,8 +29,6 @@ import ( pkicommon "github.com/banzaicloud/koperator/pkg/util/pki" ) -var log = ctrl.Log.WithName("testing") - const ( testNamespace = "test-namespace" ) @@ -93,7 +90,7 @@ func TestFinalizePKI(t *testing.T) { t.Error("Expected no error during initialization, got:", err) } - if err := manager.FinalizePKI(context.Background(), log); err != nil { + if err := manager.FinalizePKI(context.Background()); err != nil { t.Error("Expected no error on finalize, got:", err) } } @@ -109,7 +106,7 @@ func TestReconcilePKI(t *testing.T) { if err := manager.client.Create(ctx, newServerSecret()); err != nil { t.Error("error during server secret creation", reflect.TypeOf(err)) } - if err := manager.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err != nil { + if err := manager.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err != nil { if reflect.TypeOf(err) != reflect.TypeOf(errorfactory.ResourceNotReady{}) { t.Error("Expected not ready error, got:", reflect.TypeOf(err)) } @@ -118,7 +115,7 @@ func TestReconcilePKI(t *testing.T) { if err := manager.client.Create(ctx, newControllerSecret()); err != nil { t.Error("error during controller secret creation", reflect.TypeOf(err)) } - if err := manager.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err != nil { + if err := manager.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err != nil { if reflect.TypeOf(err) != reflect.TypeOf(errorfactory.ResourceNotReady{}) { t.Error("Expected not ready error, got:", reflect.TypeOf(err)) } @@ -127,7 +124,7 @@ func TestReconcilePKI(t *testing.T) { if err := manager.client.Create(ctx, newCASecret()); err != nil { t.Error("error during CA secret creation", reflect.TypeOf(err)) } - if err := manager.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err != nil { + if err := manager.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err != nil { t.Error("Expected successful reconcile, got:", err) } @@ -136,7 +133,7 @@ func TestReconcilePKI(t *testing.T) { if err != nil { t.Error("Expected no error during mocking the cluster, got:", err) } - if err := manager.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err == nil { + if err := manager.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err == nil { t.Error("Expected error got nil") } else if reflect.TypeOf(err) != reflect.TypeOf(errorfactory.ResourceNotReady{}) { t.Error("Expected not ready error, got:", reflect.TypeOf(err)) @@ -144,7 +141,7 @@ func TestReconcilePKI(t *testing.T) { if err := manager.client.Create(ctx, newPreCreatedSecret()); err != nil { t.Error("error during pre created secret creation", reflect.TypeOf(err)) } - if err := manager.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err != nil { + if err := manager.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err != nil { t.Error("Expected successful reconcile, got:", err) } } diff --git a/pkg/pki/certmanagerpki/reconcile.go b/pkg/pki/certmanagerpki/reconcile.go index f853f25a7..5a51c048f 100644 --- a/pkg/pki/certmanagerpki/reconcile.go +++ b/pkg/pki/certmanagerpki/reconcile.go @@ -19,7 +19,6 @@ import ( "fmt" "reflect" - "github.com/go-logr/logr" certv1 "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -31,23 +30,23 @@ import ( ) // reconcile ensures the given kubernetes object -func reconcile(ctx context.Context, log logr.Logger, client client.Client, object runtime.Object) (err error) { +func reconcile(ctx context.Context, client client.Client, object runtime.Object) (err error) { switch o := object.(type) { case *certv1.ClusterIssuer: - return reconcileClusterIssuer(ctx, log, client, o) + return reconcileClusterIssuer(ctx, client, o) case *certv1.Certificate: - return reconcileCertificate(ctx, log, client, o) + return reconcileCertificate(ctx, client, o) case *corev1.Secret: - return reconcileSecret(ctx, log, client, o) + return reconcileSecret(ctx, client, o) case *v1alpha1.KafkaUser: - return reconcileUser(ctx, log, client, o) + return reconcileUser(ctx, client, o) default: panic(fmt.Sprintf("Invalid object type: %v", reflect.TypeOf(object))) } } // reconcileClusterIssuer ensures a cert-manager ClusterIssuer -func reconcileClusterIssuer(ctx context.Context, _ logr.Logger, client client.Client, issuer *certv1.ClusterIssuer) error { +func reconcileClusterIssuer(ctx context.Context, client client.Client, issuer *certv1.ClusterIssuer) error { obj := &certv1.ClusterIssuer{} var err error if err = client.Get(ctx, types.NamespacedName{Name: issuer.Name, Namespace: issuer.Namespace}, obj); err != nil { @@ -60,7 +59,7 @@ func reconcileClusterIssuer(ctx context.Context, _ logr.Logger, client client.Cl } // reconcileCertificate ensures a cert-manager certificate -func reconcileCertificate(ctx context.Context, _ logr.Logger, client client.Client, cert *certv1.Certificate) error { +func reconcileCertificate(ctx context.Context, client client.Client, cert *certv1.Certificate) error { obj := &certv1.Certificate{} var err error if err = client.Get(ctx, types.NamespacedName{Name: cert.Name, Namespace: cert.Namespace}, obj); err != nil { @@ -73,7 +72,7 @@ func reconcileCertificate(ctx context.Context, _ logr.Logger, client client.Clie } // reconcileSecret ensures a Kubernetes secret -func reconcileSecret(ctx context.Context, _ logr.Logger, client client.Client, secret *corev1.Secret) error { +func reconcileSecret(ctx context.Context, client client.Client, secret *corev1.Secret) error { obj := &corev1.Secret{} var err error if err = client.Get(ctx, types.NamespacedName{Name: secret.Name, Namespace: secret.Namespace}, obj); err != nil { @@ -86,7 +85,7 @@ func reconcileSecret(ctx context.Context, _ logr.Logger, client client.Client, s } // reconcileUser ensures a v1alpha1.KafkaUser -func reconcileUser(ctx context.Context, _ logr.Logger, client client.Client, user *v1alpha1.KafkaUser) error { +func reconcileUser(ctx context.Context, client client.Client, user *v1alpha1.KafkaUser) error { obj := &v1alpha1.KafkaUser{} var err error if err = client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, obj); err != nil { diff --git a/pkg/pki/k8scsrpki/k8scsr.go b/pkg/pki/k8scsrpki/k8scsr.go index 5e3e44a45..8e0f0259d 100644 --- a/pkg/pki/k8scsrpki/k8scsr.go +++ b/pkg/pki/k8scsrpki/k8scsr.go @@ -18,7 +18,6 @@ import ( "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/util/pki" - "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -35,9 +34,8 @@ type K8sCSR interface { type k8sCSR struct { client client.Client cluster *v1beta1.KafkaCluster - logger logr.Logger } -func New(client client.Client, cluster *v1beta1.KafkaCluster, logger logr.Logger) K8sCSR { - return &k8sCSR{client: client, cluster: cluster, logger: logger} +func New(client client.Client, cluster *v1beta1.KafkaCluster) K8sCSR { + return &k8sCSR{client: client, cluster: cluster} } diff --git a/pkg/pki/k8scsrpki/k8scsr_pki.go b/pkg/pki/k8scsrpki/k8scsr_pki.go index ce71dd01f..1fc20ee49 100644 --- a/pkg/pki/k8scsrpki/k8scsr_pki.go +++ b/pkg/pki/k8scsrpki/k8scsr_pki.go @@ -22,11 +22,12 @@ import ( "github.com/go-logr/logr" ) -func (c *k8sCSR) ReconcilePKI(_ context.Context, logger logr.Logger, _ map[string]v1beta1.ListenerStatusList) error { +func (c *k8sCSR) ReconcilePKI(ctx context.Context, externalHostnames map[string]v1beta1.ListenerStatusList) error { + logger := logr.FromContextOrDiscard(ctx) logger.Info("k8sCSR PKI reconcile is skipped since it is not supported yet for server certs") return nil } -func (c *k8sCSR) FinalizePKI(_ context.Context, _ logr.Logger) error { +func (c *k8sCSR) FinalizePKI(ctx context.Context) error { return nil } diff --git a/pkg/pki/k8scsrpki/k8scsr_test.go b/pkg/pki/k8scsrpki/k8scsr_test.go index 105675a59..7015550bc 100644 --- a/pkg/pki/k8scsrpki/k8scsr_test.go +++ b/pkg/pki/k8scsrpki/k8scsr_test.go @@ -20,7 +20,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" "github.com/banzaicloud/koperator/api/v1beta1" ) @@ -53,7 +52,7 @@ func newMockCluster() *v1beta1.KafkaCluster { } func TestNew(t *testing.T) { - pkiManager := New(&mockClient{}, newMockCluster(), log.Log) + pkiManager := New(&mockClient{}, newMockCluster()) if reflect.TypeOf(pkiManager) != reflect.TypeOf(&k8sCSR{}) { t.Error("Expected new k8sCSR from New, got:", reflect.TypeOf(pkiManager)) } diff --git a/pkg/pki/k8scsrpki/k8scsr_user.go b/pkg/pki/k8scsrpki/k8scsr_user.go index 0dd4345ae..956919a6e 100644 --- a/pkg/pki/k8scsrpki/k8scsr_user.go +++ b/pkg/pki/k8scsrpki/k8scsr_user.go @@ -21,6 +21,7 @@ import ( "fmt" "emperror.dev/errors" + "github.com/go-logr/logr" "github.com/banzaicloud/k8s-objectmatcher/patch" @@ -48,6 +49,7 @@ const ( // ReconcileUserCertificate ensures and returns a user certificate - should be idempotent func (c *k8sCSR) ReconcileUserCertificate( ctx context.Context, user *v1alpha1.KafkaUser, scheme *runtime.Scheme, _ string) (*pkicommon.UserCertificate, error) { + log := logr.FromContextOrDiscard(ctx) var clientKey []byte var signingReq *certsigningreqv1.CertificateSigningRequest secret := &corev1.Secret{} @@ -148,7 +150,7 @@ func (c *k8sCSR) ReconcileUserCertificate( // Handle case when signing request is present var foundApproved bool for _, cond := range signingReq.Status.Conditions { - c.logger.Info(fmt.Sprintf("Signing request condition is: %s", cond.Type)) + log.Info(fmt.Sprintf("Signing request condition is: %s", cond.Type)) if cond.Type == certsigningreqv1.CertificateApproved { foundApproved = true break @@ -256,21 +258,22 @@ func generateCSRResource(csr []byte, name, namespace, signerName string, } func (c *k8sCSR) generateAndCreateCSR(ctx context.Context, clientkey []byte, user *v1alpha1.KafkaUser) (*certsigningreqv1.CertificateSigningRequest, error) { - c.logger.Info("Creating PKCS1PrivateKey from secret") + log := logr.FromContextOrDiscard(ctx) + log.Info("Creating PKCS1PrivateKey from secret") block, _ := pem.Decode(clientkey) privKey, parseErr := x509.ParsePKCS1PrivateKey(block.Bytes) if parseErr != nil { return nil, parseErr } - c.logger.Info("Generating SigningRequest") + log.Info("Generating SigningRequest") csr, err := certutil.GenerateSigningRequestInPemFormat(privKey, user.GetName(), user.Spec.DNSNames) if err != nil { return nil, err } - c.logger.Info("Generating k8s csr object") + log.Info("Generating k8s csr object") signingReq := generateCSRResource(csr, user.GetName(), user.GetNamespace(), user.Spec.PKIBackendSpec.SignerName, user.Spec.GetAnnotations()) - c.logger.Info("Creating k8s csr object") + log.Info("Creating k8s csr object") if err = patch.DefaultAnnotator.SetLastAppliedAnnotation(signingReq); err != nil { return nil, errors.WrapIf(err, "could not apply last state to annotation") } diff --git a/pkg/pki/k8scsrpki/k8scsr_user_test.go b/pkg/pki/k8scsrpki/k8scsr_user_test.go index 76a317ea2..0ad0ca0f1 100644 --- a/pkg/pki/k8scsrpki/k8scsr_user_test.go +++ b/pkg/pki/k8scsrpki/k8scsr_user_test.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/log" "github.com/banzaicloud/istio-client-go/pkg/networking/v1alpha3" banzaiistiov1alpha1 "github.com/banzaicloud/istio-operator/api/v2/v1alpha1" @@ -96,7 +95,7 @@ func TestReconcileUserCertificate(t *testing.T) { g.Expect(err).NotTo(HaveOccurred()) fakeClient := fake.NewClientBuilder().WithScheme(sch).Build() - pkiManager := New(fakeClient, newMockCluster(), log.Log) + pkiManager := New(fakeClient, newMockCluster()) ctx := context.Background() user := createKafkaUser() _, err = pkiManager.ReconcileUserCertificate(ctx, user, sch, "") diff --git a/pkg/pki/pki_manager.go b/pkg/pki/pki_manager.go index 4e67f5524..9e38c1b21 100644 --- a/pkg/pki/pki_manager.go +++ b/pkg/pki/pki_manager.go @@ -18,7 +18,6 @@ import ( "context" "crypto/tls" - "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -33,7 +32,7 @@ import ( var MockBackend = v1beta1.PKIBackend("mock") // GetPKIManager returns a PKI/User manager interface for a given cluster -func GetPKIManager(client client.Client, cluster *v1beta1.KafkaCluster, pkiBackend v1beta1.PKIBackend, log logr.Logger) pki.Manager { +func GetPKIManager(client client.Client, cluster *v1beta1.KafkaCluster, pkiBackend v1beta1.PKIBackend) pki.Manager { var backend v1beta1.PKIBackend if pkiBackend == v1beta1.PKIBackendProvided { backend = cluster.Spec.ListenersConfig.SSLSecrets.PKIBackend @@ -47,7 +46,7 @@ func GetPKIManager(client client.Client, cluster *v1beta1.KafkaCluster, pkiBacke return certmanagerpki.New(client, cluster) // Use k8s csr api for pki backend case v1beta1.PKIBackendK8sCSR: - return k8scsrpki.New(client, cluster, log) + return k8scsrpki.New(client, cluster) // Return mock backend for testing - cannot be triggered by CR due to enum in api schema case MockBackend: return newMockPKIManager(client, cluster) @@ -69,11 +68,11 @@ func newMockPKIManager(client client.Client, cluster *v1beta1.KafkaCluster) pki. return &mockPKIManager{client: client, cluster: cluster} } -func (m *mockPKIManager) ReconcilePKI(ctx context.Context, logger logr.Logger, extListenerStatuses map[string]v1beta1.ListenerStatusList) error { +func (m *mockPKIManager) ReconcilePKI(ctx context.Context, externalHostnames map[string]v1beta1.ListenerStatusList) error { return nil } -func (m *mockPKIManager) FinalizePKI(ctx context.Context, logger logr.Logger) error { +func (m *mockPKIManager) FinalizePKI(ctx context.Context) error { return nil } diff --git a/pkg/pki/pki_manager_test.go b/pkg/pki/pki_manager_test.go index 1535d9b5f..bf13597ef 100644 --- a/pkg/pki/pki_manager_test.go +++ b/pkg/pki/pki_manager_test.go @@ -19,7 +19,6 @@ import ( "reflect" "testing" - "github.com/go-logr/logr" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" @@ -27,8 +26,6 @@ import ( "github.com/banzaicloud/koperator/api/v1beta1" ) -var log logr.Logger - type mockClient struct { client.Client } @@ -52,7 +49,7 @@ func newMockCluster() *v1beta1.KafkaCluster { func TestGetPKIManager(t *testing.T) { cluster := newMockCluster() - mock := GetPKIManager(&mockClient{}, cluster, v1beta1.PKIBackendProvided, log) + mock := GetPKIManager(&mockClient{}, cluster, v1beta1.PKIBackendProvided) if reflect.TypeOf(mock) != reflect.TypeOf(&mockPKIManager{}) { t.Error("Expected mock client got:", reflect.TypeOf(mock)) } @@ -60,11 +57,11 @@ func TestGetPKIManager(t *testing.T) { // Test mock functions var err error - if err = mock.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err != nil { + if err = mock.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err != nil { t.Error("Expected nil error got:", err) } - if err = mock.FinalizePKI(ctx, log); err != nil { + if err = mock.FinalizePKI(ctx); err != nil { t.Error("Expected nil error got:", err) } @@ -82,7 +79,7 @@ func TestGetPKIManager(t *testing.T) { // Test other getters cluster.Spec.ListenersConfig.SSLSecrets.PKIBackend = v1beta1.PKIBackendCertManager - certmanager := GetPKIManager(&mockClient{}, cluster, v1beta1.PKIBackendProvided, log) + certmanager := GetPKIManager(&mockClient{}, cluster, v1beta1.PKIBackendProvided) pkiType := reflect.TypeOf(certmanager).String() expected := "*certmanagerpki.certManager" if pkiType != expected { @@ -91,7 +88,7 @@ func TestGetPKIManager(t *testing.T) { // Default should be cert-manager also cluster.Spec.ListenersConfig.SSLSecrets.PKIBackend = "" - certmanager = GetPKIManager(&mockClient{}, cluster, v1beta1.PKIBackendProvided, log) + certmanager = GetPKIManager(&mockClient{}, cluster, v1beta1.PKIBackendProvided) pkiType = reflect.TypeOf(certmanager).String() expected = "*certmanagerpki.certManager" if pkiType != expected { diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index b1a85426a..2961b5e50 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -182,7 +182,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return errors.WrapIf(err, "could not update status for external listeners") } intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster) - err = k8sutil.UpdateListenerStatuses(context.Background(), r.Client, r.KafkaCluster, log, intListenerStatuses, extListenerStatuses) + err = k8sutil.UpdateListenerStatuses(context.Background(), r.Client, r.KafkaCluster, intListenerStatuses, extListenerStatuses) if err != nil { return errors.WrapIf(err, "failed to update listener statuses") } @@ -190,7 +190,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { // Setup the PKI if using SSL if r.KafkaCluster.Spec.ListenersConfig.SSLSecrets != nil { // reconcile the PKI - if err := pki.GetPKIManager(r.Client, r.KafkaCluster, v1beta1.PKIBackendProvided, log).ReconcilePKI(context.TODO(), log, extListenerStatuses); err != nil { + if err := pki.GetPKIManager(r.Client, r.KafkaCluster, v1beta1.PKIBackendProvided).ReconcilePKI(context.TODO(), extListenerStatuses); err != nil { return err } } diff --git a/pkg/util/pki/common.go b/pkg/util/pki/common.go index d7c8c0739..17d120983 100644 --- a/pkg/util/pki/common.go +++ b/pkg/util/pki/common.go @@ -24,7 +24,6 @@ import ( "github.com/banzaicloud/koperator/pkg/errorfactory" "github.com/banzaicloud/koperator/pkg/k8sutil" - "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -65,10 +64,10 @@ type Manager interface { // ReconcilePKI ensures a PKI for a kafka cluster - should be idempotent. // This method should at least setup any issuer needed for user certificates // as well as broker/cruise-control secrets - ReconcilePKI(ctx context.Context, logger logr.Logger, externalHostnames map[string]v1beta1.ListenerStatusList) error + ReconcilePKI(ctx context.Context, externalHostnames map[string]v1beta1.ListenerStatusList) error // FinalizePKI performs any cleanup steps necessary for a PKI backend - FinalizePKI(ctx context.Context, logger logr.Logger) error + FinalizePKI(ctx context.Context) error // ReconcileUserCertificate ensures and returns a user certificate - should be idempotent ReconcileUserCertificate(