Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow creating KafkaTopic CR for topic that already present on the Kafka cluster #934

Merged
merged 10 commits into from
Mar 1, 2023

Conversation

bartam1
Copy link
Contributor

@bartam1 bartam1 commented Feb 17, 2023

Q A
Bug fix? no
New feature? yes
API breaks? no
Deprecations? no
License Apache 2.0

What's in this PR?

It allows to create a KafkaTopic CR when the referenced topic is already present on the Kafka cluster.
It allows when KafkaTopic CR:

  • has managedBy: koperator annotation
  • configuration is the same as the present kafka topic configuration
    otherwise the KafkaTopic CR will be rejected by validation webhook.
    There is a new status field "managedBy" which value is the manager of the kafka topic

When the KafkaTopic CR has "managedBy" annotation with not "koperator" value and the KafkaTopic CR is deleted or modified then the present kafka topic will not be affected in the kafka cluster.

Why?

Let the user create KafkaTopic resources for that topic which is already on the Kafka cluster
Users can modify the topic configuration and partition number for that topic through the created KafkaTopic CR

Additional context

Checklist

  • Implementation tested
  • Error handling code meets the guideline
  • Logging code meets the guideline
  • User guide and development docs updated (if needed)

More details

When the KafkaTopic rejected the above error message is an example:

The KafkaTopic "ccmetrics" is invalid:
* spec.name: Invalid value: "__CruiseControlMetrics": topic "__CruiseControlMetrics" already exists on kafka cluster and it is not managed by Koperator,
					when you want to be managed by Koperator to be able to modify its configuration through KafkaTopic CR,
					add this "managedBy: koperator" annotation for this KafkaTopic CR
* spec.partitions: Invalid value: 3: initial KafkaTopic partition number must be the same as the already exist kafka topic has  (given: 3 present: 12)
* spec.replicationfactor: Invalid value: 3: initial KafkaTopic replication factor must be the same as the already exist kafka topic has (given: 3 present: 2)
* spec.config: Invalid value: 3: initial KafkaTopic configuration must be the same as the already exist kafka topic configuration (difference:   map[string]*string{
  	"cleanup.policy":      &"delete",
- 	"min.insync.replicas": &"1",
- 	"retention.ms":        &"18000000",
+ 	"retention.ms":        &"604800000",
  }

@bartam1 bartam1 marked this pull request as ready for review February 18, 2023 14:27
@bartam1 bartam1 requested a review from a team as a code owner February 18, 2023 14:27
@@ -40,7 +40,8 @@ type KafkaTopicSpec struct {
// KafkaTopicStatus defines the observed state of KafkaTopic
// +k8s:openapi-gen=true
type KafkaTopicStatus struct {
State TopicState `json:"state"`
State TopicState `json:"state"`
ManagedBy string `json:"managedBy"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What is the reason we are making the managedBy part of the status in addition to using it as an annotation? (Looks like double booking to me, but I'm mostly just curious, not debating it.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there is no annotation the referenced kafka topic is managed by Koperator.
I think it is good to show who is the manager also in the topic status.
This can be a confirmation for the user when he checks the topic status that is managed by who.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're handling this in annotations everywhere else wouldn't it make more sense to add an annotation instead?

controllers/kafkatopic_controller.go Outdated Show resolved Hide resolved
controllers/kafkatopic_controller.go Outdated Show resolved Hide resolved
controllers/kafkatopic_controller.go Outdated Show resolved Hide resolved
controllers/kafkatopic_controller.go Show resolved Hide resolved
controllers/kafkatopic_controller.go Outdated Show resolved Hide resolved
@@ -55,22 +63,22 @@ func (s KafkaTopicValidator) ValidateDelete(ctx context.Context, obj runtime.Obj

func (s *KafkaTopicValidator) validate(ctx context.Context, obj runtime.Object) error {
kafkaTopic := obj.(*banzaicloudv1alpha1.KafkaTopic)
log := s.Log.WithValues("name", kafkaTopic.GetName(), "namespace", kafkaTopic.GetNamespace())
fieldErrs, err := s.validateKafkaTopic(ctx, kafkaTopic, log)
s.Log = s.Log.WithValues("name", kafkaTopic.GetName(), "namespace", kafkaTopic.GetNamespace())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Are we sure we want to overwrite the stored logger here? For different topics this would be different and in that case the local instance would be better IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is OK in this way because the kafkatopic controller runs sequential and not parallel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can let this fly, but the stored validator logger will still point to some topic after the validation already run which is IMO misleading, because it handles multiple topics.
(Practically the result is the same, but conceptually we are abusing implementation details instead of doing this cleanly.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Thank you for bringing this up. I fix this.

pkg/webhooks/kafkatopic_validator.go Show resolved Hide resolved
Comment on lines 179 to 193
if existing.NumPartitions != topic.Spec.Partitions {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("partitions"), topic.Spec.Partitions,
fmt.Sprintf(`initial KafkaTopic partition number must be the same as the already exist kafka topic has (given: %v present: %v)`, topic.Spec.Partitions, existing.NumPartitions)))
}
if existing.ReplicationFactor != int16(topic.Spec.ReplicationFactor) {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("replicationfactor"), topic.Spec.ReplicationFactor,
fmt.Sprintf(`initial KafkaTopic replication factor must be the same as the already exist kafka topic has (given: %v present: %v)`, topic.Spec.ReplicationFactor, existing.ReplicationFactor)))
}

if diff := cmp.Diff(existing.ConfigEntries, util.MapStringStringPointer(topic.Spec.Config), cmpopts.EquateEmpty()); diff != "" {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("config"), topic.Spec.Partitions,
fmt.Sprintf(`initial KafkaTopic configuration must be the same as the already exist kafka topic configuration difference: %s`, diff)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I have been thinking how we could make this less fragile on newly added fields and also on structure, but I couldn't come up with a solution that keeps the information of the fields.

pkg/webhooks/kafkatopic_validator_test.go Show resolved Hide resolved
@bartam1 bartam1 requested a review from pregnor February 21, 2023 14:11
Copy link
Contributor

@Kuvesz Kuvesz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a couple of suggestions and questions.

@@ -40,7 +40,8 @@ type KafkaTopicSpec struct {
// KafkaTopicStatus defines the observed state of KafkaTopic
// +k8s:openapi-gen=true
type KafkaTopicStatus struct {
State TopicState `json:"state"`
State TopicState `json:"state"`
ManagedBy string `json:"managedBy"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're handling this in annotations everywhere else wouldn't it make more sense to add an annotation instead?

pkg/webhooks/kafkatopic_validator.go Show resolved Hide resolved
pkg/webhooks/kafkatopic_validator.go Outdated Show resolved Hide resolved
pkg/webhooks/kafkatopic_validator.go Outdated Show resolved Hide resolved
pkg/webhooks/kafkatopic_validator.go Outdated Show resolved Hide resolved
pkg/webhooks/kafkatopic_validator.go Outdated Show resolved Hide resolved
pkg/webhooks/kafkatopic_validator.go Outdated Show resolved Hide resolved
pkg/webhooks/kafkatopic_validator_test.go Outdated Show resolved Hide resolved
@bartam1 bartam1 requested a review from Kuvesz February 22, 2023 12:17
Kuvesz
Kuvesz previously approved these changes Feb 23, 2023
Copy link
Contributor

@Kuvesz Kuvesz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can accept the managedBy status field, though I'm not a huge fan of it. :D Just left a quick question and a suggestion on a typo.

pkg/webhooks/kafkatopic_validator.go Outdated Show resolved Hide resolved
@@ -41,6 +41,10 @@ type KafkaTopicSpec struct {
// +k8s:openapi-gen=true
type KafkaTopicStatus struct {
State TopicState `json:"state"`
// ManagedBy describes who is the manager of the Kafka topic.
// When its value is not 'koperator' then modifications to the KafkaTopic CR's topic configurations will be unaffected on the Kafka topic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm a bit confused by this still. What happens if the annotation is not set at all? Do we assume it's managed by koperator or not? I seem to remember a comment (though I can't find it) that said that previously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when there is no annotation it is managedby koperator

charts/kafka-operator/templates/crds.yaml Outdated Show resolved Hide resolved
controllers/kafkatopic_controller.go Outdated Show resolved Hide resolved
controllers/kafkatopic_controller.go Outdated Show resolved Hide resolved
@@ -70,7 +79,7 @@ func (s *KafkaTopicValidator) validate(ctx context.Context, obj runtime.Object)
kafkaTopic.Name, fieldErrs)
}

func (s *KafkaTopicValidator) validateKafkaTopic(ctx context.Context, topic *banzaicloudv1alpha1.KafkaTopic, log logr.Logger) (field.ErrorList, error) {
func (s *KafkaTopicValidator) validateKafkaTopic(ctx context.Context, log logr.Logger, topic *banzaicloudv1alpha1.KafkaTopic) (field.ErrorList, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One suggestion: it's probably better to get the logger from the context (follow what we have in the existing codebase) instead of passing it around as an argument, I believe passing the logger through argument is recommended

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that passing the logger as argument is recommended and not to use ctx to get the logger.
Probably when you writing tests you can just use in the argument the log.Discard() to solve the dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that passing the logger as argument is recommended and not to use ctx to get the logger.

Oh really? Can you provide a reference?

I thought it was the other way around since there was an onboarding task (#765) that I did a long time ago to replace the logger argument with the one getting from the context.

If it is the case that it's recommended to pass the logger as argument, we might need to revert that PR

pkg/webhooks/kafkatopic_validator.go Outdated Show resolved Hide resolved
// Comparing KafkaTopic configuration with the existing
if existing.NumPartitions != topic.Spec.Partitions {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("partitions"), topic.Spec.Partitions,
fmt.Sprintf(`initial KafkaTopic partition number must be the same as what the existing kafka topic has (given: %v present: %v)`, topic.Spec.Partitions, existing.NumPartitions)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused about this "initial" wording -- it is leading me to the partition number that the users specified when the kafka topic was initially created. But we are actually comparing the new partition number against the existing value

Same confusion while reading the other "initial" messages in this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed it. What you think? Is it clear now?

Copy link
Member

@panyuenlau panyuenlau Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps having the following is clear enough (I just removed the "initially" from the log):

"when creating KafkaTopic CR for existing topic, its partition number must be the same as what the existing kafka topic has..."

What do you think?

Also, the "when" shouldn't be capitalized because it is an error string, see https://github.com/golang/go/wiki/CodeReviewComments#error-strings

pregnor
pregnor previously approved these changes Feb 27, 2023
Copy link
Member

@pregnor pregnor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with 1 comment rewording optional suggestion.

@@ -41,6 +41,10 @@ type KafkaTopicSpec struct {
// +k8s:openapi-gen=true
type KafkaTopicStatus struct {
State TopicState `json:"state"`
// ManagedBy describes who is the manager of the Kafka topic.
// When its value is not "koperator" then modifications to the topic configurations of the KafkaTopic CR will be unaffected on the Kafka topic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: The wording MAY be improved.

Suggested change
// When its value is not "koperator" then modifications to the topic configurations of the KafkaTopic CR will be unaffected on the Kafka topic.
// When its value is not "koperator" then modifications to the topic configurations of the KafkaTopic CR will not be propagated to the Kafka topic and the CR will be synchronized back to the Kafka topic eventually.

Note: correct me if the part after the "and" keyword would not be entirely correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be no syncronization.

charts/kafka-operator/templates/crds.yaml Outdated Show resolved Hide resolved
}
return nil, nil
return allErrs, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: It would be great to refactor the code duplications, but it should be a separate effort.

Kuvesz
Kuvesz previously approved these changes Feb 27, 2023
@@ -173,22 +173,22 @@ func (s *KafkaTopicValidator) checkKafka(ctx context.Context, topic *banzaicloud
if manager, ok := topic.GetAnnotations()[TopicManagedByAnnotationKey]; !ok || strings.ToLower(manager) != TopicManagedByKoperatorAnnotationValue {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("name"), topic.Spec.Name,
fmt.Sprintf(`topic "%s" already exists on kafka cluster and it is not managed by Koperator,
if you want it to be managed by Koperator making you able to modify its configuration through a KafkaTopic CR,
if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR,,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR,,
if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR,

@bartam1 bartam1 force-pushed the allowexistingtopic branch from 119347e to 5240f85 Compare March 1, 2023 10:29
@CLAassistant
Copy link

CLAassistant commented Mar 1, 2023

CLA assistant check
All committers have signed the CLA.

Copy link
Member

@pregnor pregnor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with 1 comment

@@ -3907,6 +3907,21 @@ spec:
external listener advertise address according to the description
of the "hostnameOverride" field.
type: object
nodePortNodeAddressType:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: are we sure this should be here?
I think this should only be in your other PR.

@bartam1 bartam1 merged commit 10dfa06 into master Mar 1, 2023
@bartam1 bartam1 deleted the allowexistingtopic branch March 1, 2023 12:13
for _, testCase := range testCases {
t.Run(testCase.testName, func(t *testing.T) {
t.Parallel()
fieldErrorList, err := kafkaTopicValidator.checkKafka(context.Background(), &testCase.kafkaTopic, cluster)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this section of code suffer from the "loop var + closures + goroutines" gotcha from Go ?

I tried adding a t.Log(testCase) beneath this line and the output always has testName as topic configuration is different and managedBy koperator (the last test case).

=== NAME  TestCheckKafkaTopicExist/topic_configuration_is_same_and_managedBy_koperator
    kafkatopic_validator_test.go:204: {topic configuration is different and managedBy koperator {{ } {      0 {{0 0 <nil>}} <nil> <nil> map[] map[managedBy:koperator] [] [] []} {test-topic 2 1 map[testConfKey:testConfVal1] { }} { }} [its configuration must be]}
=== NAME  TestCheckKafkaTopicExist/topic_replication_factor_is_different_and_managedBy_koperator
    kafkatopic_validator_test.go:204: {topic configuration is different and managedBy koperator {{ } {      0 {{0 0 <nil>}} <nil> <nil> map[] map[managedBy:koperator] [] [] []} {test-topic 2 1 map[testConfKey:testConfVal1] { }} { }} [its configuration must be]}
=== NAME  TestCheckKafkaTopicExist/topic_partition_and_replication_is_different_and_not_managedBy_koperator
    kafkatopic_validator_test.go:204: {topic configuration is different and managedBy koperator {{ } {      0 {{0 0 <nil>}} <nil> <nil> map[] map[managedBy:koperator] [] [] []} {test-topic 2 1 map[testConfKey:testConfVal1] { }} { }} [its configuration must be]}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks to me like a proper full name.
TestCheckKafkaTopicExist/topic_configuration_is_same_and_managedBy_koperator
Not sure what you mean exactly.

Copy link
Contributor

@mihaialexandrescu mihaialexandrescu Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.Run(testCase.testName, func(t *testing.T) { copies testCase.testName (pass by value during function call) but the subsequent uses of the "full" testCase are not (loop var is referenced and doesn't change).

The output of the log message is different than the test name -- which proves what I'm saying :

fieldErrorList, err := kafkaTopicValidator.checkKafka(context.Background(), &testCase.kafkaTopic, cluster)
...
t.Log("The name is: " + testCase.testName)

Results in (first line(subtest name) and second line(our log message) in both instances have different "names"):

=== CONT  TestCheckKafkaTopicExist/topic_partition_is_different_and_managedBy_koperator
    kafkatopic_validator_test.go:202: The name is: topic configuration is different and managedBy koperator

=== NAME  TestCheckKafkaTopicExist/topic_configuration_is_same_and_managedBy_koperator
    kafkatopic_validator_test.go:202: The name is: topic configuration is different and managedBy koperator 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, I think after the for there should be a

testCase := testCase

line which fixes this.
(I use a linter that alerts on this.)

Thanks for catching Mihai.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants