Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use logger from context given by controller runtime wherever it's applicable #765

Merged
merged 6 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions controllers/kafkacluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ var clusterUsersFinalizer = "users.kafkaclusters.kafka.banzaicloud.io"
type KafkaClusterReconciler struct {
client.Client
DirectClient client.Reader
Log logr.Logger
Namespaces []string
KafkaClientProvider kafkaclient.Provider
}
Expand All @@ -85,7 +84,7 @@ type KafkaClusterReconciler struct {
// +kubebuilder:rbac:groups=networking.istio.io,resources=*,verbs=*

func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("Request.Namespace", request.NamespacedName, "Request.Name", request.Name)
log := logr.FromContextOrDiscard(ctx).WithValues("Request.Namespace", request.NamespacedName, "Request.Name", request.Name)
panyuenlau marked this conversation as resolved.
Show resolved Hide resolved

log.Info("Reconciling KafkaCluster")

Expand All @@ -104,7 +103,7 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req

// Check if marked for deletion and run finalizers
if k8sutil.IsMarkedForDeletion(instance.ObjectMeta) {
return r.checkFinalizers(ctx, log, instance)
return r.checkFinalizers(ctx, instance)
}

if instance.Status.State != v1beta1.KafkaClusterRollingUpgrading {
Expand Down Expand Up @@ -192,7 +191,8 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req
return reconciled()
}

func (r *KafkaClusterReconciler) checkFinalizers(ctx context.Context, log logr.Logger, cluster *v1beta1.KafkaCluster) (ctrl.Result, error) {
func (r *KafkaClusterReconciler) checkFinalizers(ctx context.Context, cluster *v1beta1.KafkaCluster) (ctrl.Result, error) {
log := logr.FromContextOrDiscard(ctx)
log.Info("KafkaCluster is marked for deletion, checking for children")

// If the main finalizer is gone then we've already finished up
Expand Down Expand Up @@ -287,7 +287,7 @@ func (r *KafkaClusterReconciler) checkFinalizers(ctx context.Context, log logr.L
// Do any necessary PKI cleanup - a PKI backend should make sure any
// user finalizations are done before it does its final cleanup
log.Info("Tearing down any PKI resources for the kafkacluster")
if err = pki.GetPKIManager(r.Client, cluster, v1beta1.PKIBackendProvided, r.Log).FinalizePKI(ctx, log); err != nil {
if err = pki.GetPKIManager(r.Client, cluster, v1beta1.PKIBackendProvided).FinalizePKI(ctx); err != nil {
switch err.(type) {
case errorfactory.ResourceNotReady:
log.Info("The PKI is not ready to be torn down")
Expand Down Expand Up @@ -349,7 +349,8 @@ func (r *KafkaClusterReconciler) updateAndFetchLatest(ctx context.Context, clust
}

// SetupKafkaClusterWithManager registers kafka cluster controller to the manager
func SetupKafkaClusterWithManager(mgr ctrl.Manager, log logr.Logger) *ctrl.Builder {
func SetupKafkaClusterWithManager(mgr ctrl.Manager) *ctrl.Builder {
log := mgr.GetLogger()
builder := ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.KafkaCluster{}).Named("KafkaCluster")

Expand Down
8 changes: 4 additions & 4 deletions controllers/kafkatopic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,14 @@ type KafkaTopicReconciler struct {
// that reads objects from the cache and writes to the apiserver
Client client.Client
Scheme *runtime.Scheme
Log logr.Logger
}

// +kubebuilder:rbac:groups=kafka.banzaicloud.io,resources=kafkatopics,verbs=get;list;watch;create;update;patch;delete;deletecollection
// +kubebuilder:rbac:groups=kafka.banzaicloud.io,resources=kafkatopics/status,verbs=get;update;patch

// Reconcile reconciles the kafka topic
func (r *KafkaTopicReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
reqLogger := r.Log.WithValues("kafkatopic", request.NamespacedName, "Request.Name", request.Name)
reqLogger := logr.FromContextOrDiscard(ctx).WithValues("kafkatopic", request.NamespacedName, "Request.Name", request.Name)
panyuenlau marked this conversation as resolved.
Show resolved Hide resolved
reqLogger.Info("Reconciling KafkaTopic")
var err error

Expand Down Expand Up @@ -141,7 +140,7 @@ func (r *KafkaTopicReconciler) Reconcile(ctx context.Context, request reconcile.

// Check if marked for deletion and if so run finalizers
if k8sutil.IsMarkedForDeletion(instance.ObjectMeta) {
return r.checkFinalizers(ctx, reqLogger, broker, instance)
return r.checkFinalizers(ctx, broker, instance)
}

// Check if the topic already exists
Expand Down Expand Up @@ -226,7 +225,8 @@ func (r *KafkaTopicReconciler) updateAndFetchLatest(ctx context.Context, topic *
return topic, nil
}

func (r *KafkaTopicReconciler) checkFinalizers(ctx context.Context, reqLogger logr.Logger, broker kafkaclient.KafkaClient, topic *v1alpha1.KafkaTopic) (reconcile.Result, error) {
func (r *KafkaTopicReconciler) checkFinalizers(ctx context.Context, broker kafkaclient.KafkaClient, topic *v1alpha1.KafkaTopic) (reconcile.Result, error) {
reqLogger := logr.FromContextOrDiscard(ctx)
reqLogger.Info("Kafka topic is marked for deletion")
var err error
if util.StringSliceContains(topic.GetFinalizers(), topicFinalizer) {
Expand Down
13 changes: 7 additions & 6 deletions controllers/kafkauser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ import (
var userFinalizer = "finalizer.kafkausers.kafka.banzaicloud.io"

// SetupKafkaUserWithManager registers KafkaUser controller to the manager
func SetupKafkaUserWithManager(mgr ctrl.Manager, certSigningEnabled bool, certManagerNamespace bool, log logr.Logger) *ctrl.Builder {
func SetupKafkaUserWithManager(mgr ctrl.Manager, certSigningEnabled bool, certManagerNamespace bool) *ctrl.Builder {
log := mgr.GetLogger()
builder := ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.KafkaUser{}).Named("KafkaUser")
if certSigningEnabled {
Expand Down Expand Up @@ -154,7 +155,6 @@ type KafkaUserReconciler struct {
// that reads objects from the cache and writes to the apiserver
Client client.Client
Scheme *runtime.Scheme
Log logr.Logger
}

// +kubebuilder:rbac:groups=kafka.banzaicloud.io,resources=kafkausers,verbs=get;list;watch;create;update;patch;delete;deletecollection
Expand All @@ -167,7 +167,7 @@ type KafkaUserReconciler struct {
// Reconcile reads that state of the cluster for a KafkaUser object and makes changes based on the state read
// and what is in the KafkaUser.Spec
func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
reqLogger := r.Log.WithValues("kafkauser", request.NamespacedName, "Request.Name", request.Name)
reqLogger := logr.FromContextOrDiscard(ctx).WithValues("kafkauser", request.NamespacedName, "Request.Name", request.Name)
panyuenlau marked this conversation as resolved.
Show resolved Hide resolved
reqLogger.Info("Reconciling KafkaUser")
var err error

Expand Down Expand Up @@ -212,7 +212,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R
backend = v1beta1.PKIBackendProvided
}

pkiManager := pki.GetPKIManager(r.Client, cluster, backend, r.Log)
pkiManager := pki.GetPKIManager(r.Client, cluster, backend)

user, err := pkiManager.ReconcileUserCertificate(ctx, instance, r.Scheme, cluster.Spec.GetKubernetesClusterDomain())
if err != nil {
Expand Down Expand Up @@ -257,7 +257,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R

// check if marked for deletion and remove kafka ACLs
if k8sutil.IsMarkedForDeletion(instance.ObjectMeta) {
return r.checkFinalizers(ctx, reqLogger, cluster, instance, kafkaUser)
return r.checkFinalizers(ctx, cluster, instance, kafkaUser)
}

// ensure a kafkaCluster label
Expand Down Expand Up @@ -324,7 +324,8 @@ func (r *KafkaUserReconciler) updateAndFetchLatest(ctx context.Context, user *v1
return user, nil
}

func (r *KafkaUserReconciler) checkFinalizers(ctx context.Context, reqLogger logr.Logger, cluster *v1beta1.KafkaCluster, instance *v1alpha1.KafkaUser, user string) (reconcile.Result, error) {
func (r *KafkaUserReconciler) checkFinalizers(ctx context.Context, cluster *v1beta1.KafkaCluster, instance *v1alpha1.KafkaUser, user string) (reconcile.Result, error) {
reqLogger := logr.FromContextOrDiscard(ctx)
// run finalizers
var err error
if util.StringSliceContains(instance.GetFinalizers(), userFinalizer) {
Expand Down
4 changes: 2 additions & 2 deletions controllers/tests/clusterregistry/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var _ = BeforeSuite(func() {
Expect(mgr).ToNot(BeNil())

kafkaClusterReconciler = NewTestReconciler()
err = controllers.SetupKafkaClusterWithManager(mgr, logf.Log).Named("KafkaCluster").Complete(kafkaClusterReconciler)
err = controllers.SetupKafkaClusterWithManager(mgr).Named("KafkaCluster").Complete(kafkaClusterReconciler)
Expect(err).NotTo(HaveOccurred())

kafkaTopicReconciler = NewTestReconciler()
Expand All @@ -142,7 +142,7 @@ var _ = BeforeSuite(func() {

// Create a new kafka user reconciler
kafkaUserReconciler = NewTestReconciler()
err = controllers.SetupKafkaUserWithManager(mgr, true, true, logf.Log).Named("KafkaUser").Complete(kafkaUserReconciler)
err = controllers.SetupKafkaUserWithManager(mgr, true, true).Named("KafkaUser").Complete(kafkaUserReconciler)
Expect(err).NotTo(HaveOccurred())

cruiseControlTaskReconciler = NewTestReconciler()
Expand Down
7 changes: 2 additions & 5 deletions controllers/tests/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,15 @@ var _ = BeforeSuite(func() {
kafkaClusterReconciler := controllers.KafkaClusterReconciler{
Client: mgr.GetClient(),
DirectClient: mgr.GetAPIReader(),
Log: ctrl.Log.WithName("controllers").WithName("KafkaCluster"),
KafkaClientProvider: kafkaclient.NewMockProvider(),
}

err = controllers.SetupKafkaClusterWithManager(mgr, kafkaClusterReconciler.Log).Complete(&kafkaClusterReconciler)
err = controllers.SetupKafkaClusterWithManager(mgr).Complete(&kafkaClusterReconciler)
Expect(err).NotTo(HaveOccurred())

kafkaTopicReconciler := &controllers.KafkaTopicReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("KafkaTopic"),
}

err = controllers.SetupKafkaTopicWithManager(mgr, 10).Complete(kafkaTopicReconciler)
Expand All @@ -166,10 +164,9 @@ var _ = BeforeSuite(func() {
kafkaUserReconciler := controllers.KafkaUserReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("KafkaUser"),
}

err = controllers.SetupKafkaUserWithManager(mgr, true, true, kafkaUserReconciler.Log).Complete(&kafkaUserReconciler)
err = controllers.SetupKafkaUserWithManager(mgr, true, true).Complete(&kafkaUserReconciler)
Expect(err).NotTo(HaveOccurred())

kafkaClusterCCReconciler := controllers.CruiseControlTaskReconciler{
Expand Down
7 changes: 2 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,17 @@ func main() {
Client: mgr.GetClient(),
DirectClient: mgr.GetAPIReader(),
Namespaces: namespaceList,
Log: ctrl.Log.WithName("controllers").WithName("KafkaCluster"),
KafkaClientProvider: kafkaclient.NewDefaultProvider(),
}

if err = controllers.SetupKafkaClusterWithManager(mgr, kafkaClusterReconciler.Log).Complete(kafkaClusterReconciler); err != nil {
if err = controllers.SetupKafkaClusterWithManager(mgr).Complete(kafkaClusterReconciler); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KafkaCluster")
os.Exit(1)
}

kafkaTopicReconciler := &controllers.KafkaTopicReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("KafkaTopic"),
}

if err = controllers.SetupKafkaTopicWithManager(mgr, maxKafkaTopicConcurrentReconciles).Complete(kafkaTopicReconciler); err != nil {
Expand All @@ -176,10 +174,9 @@ func main() {
kafkaUserReconciler := &controllers.KafkaUserReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("KafkaUser"),
}

if err = controllers.SetupKafkaUserWithManager(mgr, !certSigningDisabled, certManagerEnabled, kafkaUserReconciler.Log).Complete(kafkaUserReconciler); err != nil {
if err = controllers.SetupKafkaUserWithManager(mgr, !certSigningDisabled, certManagerEnabled).Complete(kafkaUserReconciler); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KafkaUser")
os.Exit(1)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/k8sutil/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ func UpdateRollingUpgradeState(c client.Client, cluster *banzaicloudv1beta1.Kafk
return nil
}

func UpdateListenerStatuses(ctx context.Context, c client.Client, cluster *banzaicloudv1beta1.KafkaCluster, logger logr.Logger,
intListenerStatuses, extListenerStatuses map[string]banzaicloudv1beta1.ListenerStatusList) error {
func UpdateListenerStatuses(ctx context.Context, c client.Client, cluster *banzaicloudv1beta1.KafkaCluster, intListenerStatuses, extListenerStatuses map[string]banzaicloudv1beta1.ListenerStatusList) error {
logger := logr.FromContextOrDiscard(ctx)

typeMeta := cluster.TypeMeta

cluster.Status.ListenerStatuses = banzaicloudv1beta1.ListenerStatuses{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafkaclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func ClusterConfig(client client.Client, cluster *v1beta1.KafkaCluster) (*KafkaC
if cluster.Spec.GetClientSSLCertSecretName() != "" {
tlsConfig, err = util.GetClientTLSConfig(client, types.NamespacedName{Name: cluster.Spec.GetClientSSLCertSecretName(), Namespace: cluster.Namespace})
} else if cluster.Spec.ListenersConfig.SSLSecrets != nil {
tlsConfig, err = pki.GetPKIManager(client, cluster, v1beta1.PKIBackendProvided, log).GetControllerTLSConfig()
tlsConfig, err = pki.GetPKIManager(client, cluster, v1beta1.PKIBackendProvided).GetControllerTLSConfig()
} else {
err = errors.New("either 'clientSSLCertSecret' or 'sslSecrets' must be specified as internal listener used for inner communication uses SSL")
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/pki/certmanagerpki/certmanager_pki.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (
pkicommon "github.com/banzaicloud/koperator/pkg/util/pki"
)

func (c *certManager) FinalizePKI(ctx context.Context, logger logr.Logger) error {
func (c *certManager) FinalizePKI(ctx context.Context) error {
logger := logr.FromContextOrDiscard(ctx)
logger.Info("Removing cert-manager certificates and secrets")

// Safety check that we are actually doing something
Expand Down Expand Up @@ -87,7 +88,8 @@ func (c *certManager) FinalizePKI(ctx context.Context, logger logr.Logger) error
return nil
}

func (c *certManager) ReconcilePKI(ctx context.Context, logger logr.Logger, extListenerStatuses map[string]v1beta1.ListenerStatusList) (err error) {
func (c *certManager) ReconcilePKI(ctx context.Context, extListenerStatuses map[string]v1beta1.ListenerStatusList) (err error) {
logger := logr.FromContextOrDiscard(ctx)
logger.Info("Reconciling cert-manager PKI")

resources, err := c.kafkapki(ctx, extListenerStatuses)
Expand All @@ -96,7 +98,7 @@ func (c *certManager) ReconcilePKI(ctx context.Context, logger logr.Logger, extL
}

for _, o := range resources {
if err := reconcile(ctx, logger, c.client, o); err != nil {
if err := reconcile(ctx, c.client, o); err != nil {
return err
}
}
Expand Down
15 changes: 6 additions & 9 deletions pkg/pki/certmanagerpki/certmanager_pki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"

corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
Expand All @@ -30,8 +29,6 @@ import (
pkicommon "github.com/banzaicloud/koperator/pkg/util/pki"
)

var log = ctrl.Log.WithName("testing")

const (
testNamespace = "test-namespace"
)
Expand Down Expand Up @@ -93,7 +90,7 @@ func TestFinalizePKI(t *testing.T) {
t.Error("Expected no error during initialization, got:", err)
}

if err := manager.FinalizePKI(context.Background(), log); err != nil {
if err := manager.FinalizePKI(context.Background()); err != nil {
t.Error("Expected no error on finalize, got:", err)
}
}
Expand All @@ -109,7 +106,7 @@ func TestReconcilePKI(t *testing.T) {
if err := manager.client.Create(ctx, newServerSecret()); err != nil {
t.Error("error during server secret creation", reflect.TypeOf(err))
}
if err := manager.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err != nil {
if err := manager.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err != nil {
if reflect.TypeOf(err) != reflect.TypeOf(errorfactory.ResourceNotReady{}) {
t.Error("Expected not ready error, got:", reflect.TypeOf(err))
}
Expand All @@ -118,7 +115,7 @@ func TestReconcilePKI(t *testing.T) {
if err := manager.client.Create(ctx, newControllerSecret()); err != nil {
t.Error("error during controller secret creation", reflect.TypeOf(err))
}
if err := manager.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err != nil {
if err := manager.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err != nil {
if reflect.TypeOf(err) != reflect.TypeOf(errorfactory.ResourceNotReady{}) {
t.Error("Expected not ready error, got:", reflect.TypeOf(err))
}
Expand All @@ -127,7 +124,7 @@ func TestReconcilePKI(t *testing.T) {
if err := manager.client.Create(ctx, newCASecret()); err != nil {
t.Error("error during CA secret creation", reflect.TypeOf(err))
}
if err := manager.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err != nil {
if err := manager.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err != nil {
t.Error("Expected successful reconcile, got:", err)
}

Expand All @@ -136,15 +133,15 @@ func TestReconcilePKI(t *testing.T) {
if err != nil {
t.Error("Expected no error during mocking the cluster, got:", err)
}
if err := manager.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err == nil {
if err := manager.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err == nil {
t.Error("Expected error got nil")
} else if reflect.TypeOf(err) != reflect.TypeOf(errorfactory.ResourceNotReady{}) {
t.Error("Expected not ready error, got:", reflect.TypeOf(err))
}
if err := manager.client.Create(ctx, newPreCreatedSecret()); err != nil {
t.Error("error during pre created secret creation", reflect.TypeOf(err))
}
if err := manager.ReconcilePKI(ctx, log, make(map[string]v1beta1.ListenerStatusList)); err != nil {
if err := manager.ReconcilePKI(ctx, make(map[string]v1beta1.ListenerStatusList)); err != nil {
t.Error("Expected successful reconcile, got:", err)
}
}
Loading