Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow creating KafkaTopic CR for topic that already present on the Kafka cluster #914

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
<p align="center">

<a href="https://hub.docker.com/r/banzaicloud/kafka-operator/">
<img src="https://img.shields.io/docker/cloud/automated/banzaicloud/kafka-operator.svg" alt="Docker Automated build">
</a>
![Koperator](https://img.shields.io/github/v/release/banzaicloud/koperator?label=Koperator&sort=semver)
![Released](https://img.shields.io/github/release-date/banzaicloud/koperator?label=Released)
![License](https://img.shields.io/github/license/banzaicloud/koperator?label=License)
![Go version (latest release)](https://img.shields.io/github/go-mod/go-version/banzaicloud/koperator/v0.22.0)

<a href="https://circleci.com/gh/banzaicloud/koperator">
<img src="https://circleci.com/gh/banzaicloud/koperator/tree/master.svg?style=shield" alt="CircleCI">
</a>
</p>

---

<a href="https://goreportcard.com/report/github.com/banzaicloud/koperator">
<img src="https://goreportcard.com/badge/github.com/banzaicloud/koperator" alt="Go Report Card">
</a>
<p align="center">

<a href="https://github.com/banzaicloud/koperator/">
<img src="https://img.shields.io/badge/license-Apache%20v2-orange.svg" alt="license">
</a>
![Go version](https://img.shields.io/github/go-mod/go-version/banzaicloud/koperator/master)
[![Go Report Card](https://goreportcard.com/badge/github.com/banzaicloud/koperator)](https://goreportcard.com/report/github.com/banzaicloud/koperator)
![CI](https://img.shields.io/github/actions/workflow/status/banzaicloud/koperator/ci.yml?branch=master&label=CI)
![Image](https://img.shields.io/github/actions/workflow/status/banzaicloud/koperator/docker.yml?branch=master&label=Image)
![Image (perf test)](https://img.shields.io/github/actions/workflow/status/banzaicloud/koperator/docker_perf_test_load.yml?branch=master&label=Image%20%28perf%20test%29)
![Helm chart](https://img.shields.io/github/actions/workflow/status/banzaicloud/koperator/helm.yml?branch=master&label=Helm%20chart)

</p>

Expand Down
3 changes: 3 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ type DisruptionBudgetWithStrategy struct {

// Broker defines the broker basic configuration
type Broker struct {
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=65535
// +kubebuilder:validation:ExclusiveMaximum=true
Id int32 `json:"id"`
BrokerConfigGroup string `json:"brokerConfigGroup,omitempty"`
ReadOnlyConfig string `json:"readOnlyConfig,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions charts/kafka-operator/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v1
name: kafka-operator
version: 0.23.0-dev.0
version: 0.23.0
description: kafka-operator manages Kafka deployments on Kubernetes
sources:
- https://github.com/banzaicloud/koperator
appVersion: v0.23.0-dev.0
appVersion: v0.23.0
2 changes: 1 addition & 1 deletion charts/kafka-operator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ The following table lists the configurable parameters of the Banzaicloud Kafka O
Parameter | Description | Default
--------- | ----------- | -------
`operator.image.repository` | Operator container image repository | `ghcr.io/banzaicloud/kafka-operator`
`operator.image.tag` | Operator container image tag | `v0.22.0`
`operator.image.tag` | Operator container image tag | `v0.23.0`
`operator.image.pullPolicy` | Operator container image pull policy | `IfNotPresent`
`operator.serviceAccount.name` | ServiceAccount used by the operator pod | `kafka-operator`
`operator.serviceAccount.create` | If true, create the `operator.serviceAccount.name` service account | `true`
Expand Down
3 changes: 3 additions & 0 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12456,7 +12456,10 @@ spec:
brokerConfigGroup:
type: string
id:
exclusiveMaximum: true
format: int32
maximum: 65535
minimum: 0
type: integer
readOnlyConfig:
type: string
Expand Down
3 changes: 3 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12293,7 +12293,10 @@ spec:
brokerConfigGroup:
type: string
id:
exclusiveMaximum: true
format: int32
maximum: 65535
minimum: 0
type: integer
readOnlyConfig:
type: string
Expand Down
8 changes: 4 additions & 4 deletions controllers/controller_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ func clusterLabelString(cluster *v1beta1.KafkaCluster) string {
// checkBrokerConnectionError is a convenience wrapper for returning from common
// broker connection errors
func checkBrokerConnectionError(logger logr.Logger, err error) (ctrl.Result, error) {
switch errors.Cause(err).(type) {
case errorfactory.BrokersUnreachable:
switch {
case errors.As(err, &errorfactory.BrokersUnreachable{}):
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.BrokersNotReady:
case errors.As(err, &errorfactory.BrokersNotReady{}):
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.ResourceNotReady:
case errors.As(err, &errorfactory.ResourceNotReady{}):
logger.Info("Needed resource for broker connection not found, may not be ready")
return ctrl.Result{
Requeue: true,
Expand Down
3 changes: 1 addition & 2 deletions controllers/cruisecontroloperation_ttl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/banzaicloud/koperator/api/v1alpha1"
banzaiv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1"
)

Expand Down Expand Up @@ -108,7 +107,7 @@ func IsExpired(ttl time.Duration, finishedAt time.Time) bool {
return time.Since(finishedAt) > ttl
}

func (r *CruiseControlOperationTTLReconciler) delete(ctx context.Context, ccOperation *v1alpha1.CruiseControlOperation) (reconcile.Result, error) {
func (r *CruiseControlOperationTTLReconciler) delete(ctx context.Context, ccOperation *banzaiv1alpha1.CruiseControlOperation) (reconcile.Result, error) {
log := logr.FromContextOrDiscard(ctx)
err := r.Delete(ctx, ccOperation)
if err != nil && !apierrors.IsNotFound(err) {
Expand Down
3 changes: 1 addition & 2 deletions controllers/cruisecontroltask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (

apiutil "github.com/banzaicloud/koperator/api/util"
banzaiv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
banzaiv1beta1 "github.com/banzaicloud/koperator/api/v1beta1"
koperatorccconf "github.com/banzaicloud/koperator/pkg/resources/cruisecontrol"
"github.com/banzaicloud/koperator/pkg/scale"
Expand Down Expand Up @@ -370,7 +369,7 @@ func brokersJBODSelector(brokerIDs []string, capacityConfigJSON string) (brokers
continue
}

brokerId, ok, err := unstructured.NestedString(brokerCapacityMap, v1beta1.BrokerIdLabelKey)
brokerId, ok, err := unstructured.NestedString(brokerCapacityMap, banzaiv1beta1.BrokerIdLabelKey)
if err != nil {
return nil, nil, errors.WrapIfWithDetails(err,
"could not retrieve broker Id from broker capacity configuration",
Expand Down
24 changes: 14 additions & 10 deletions controllers/kafkacluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,44 +124,48 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req
for _, rec := range reconcilers {
err = rec.Reconcile(log)
if err != nil {
switch errors.Cause(err).(type) {
case errorfactory.BrokersUnreachable:
switch {
case errors.As(err, &errorfactory.BrokersUnreachable{}):
log.Info("Brokers unreachable, may still be starting up", "error", err.Error())
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.BrokersNotReady:
case errors.As(err, &errorfactory.BrokersNotReady{}):
log.Info("Brokers not ready, may still be starting up", "error", err.Error())
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.ResourceNotReady:
case errors.As(err, &errorfactory.ResourceNotReady{}):
log.Info("A new resource was not found or may not be ready", "error", err.Error())
return ctrl.Result{
RequeueAfter: time.Duration(7) * time.Second,
}, nil
case errorfactory.ReconcileRollingUpgrade:
case errors.As(err, &errorfactory.ReconcileRollingUpgrade{}):
log.Info("Rolling Upgrade in Progress")
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.CruiseControlNotReady:
case errors.As(err, &errorfactory.CruiseControlNotReady{}):
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.CruiseControlTaskRunning:
case errors.As(err, &errorfactory.CruiseControlTaskRunning{}):
return ctrl.Result{
RequeueAfter: time.Duration(20) * time.Second,
}, nil
case errors.As(err, &errorfactory.CruiseControlTaskTimeout{}):
return ctrl.Result{
RequeueAfter: time.Duration(20) * time.Second,
}, nil
case errorfactory.CruiseControlTaskTimeout, errorfactory.CruiseControlTaskFailure:
case errors.As(err, &errorfactory.CruiseControlTaskFailure{}):
return ctrl.Result{
RequeueAfter: time.Duration(20) * time.Second,
}, nil
case errorfactory.PerBrokerConfigNotReady:
case errors.As(err, &errorfactory.PerBrokerConfigNotReady{}):
log.V(1).Info("dynamically updated broker configuration hasn't propagated through yet")
// for exponential backoff
return ctrl.Result{}, err
case errorfactory.LoadBalancerIPNotReady:
case errors.As(err, &errorfactory.LoadBalancerIPNotReady{}):
return ctrl.Result{
RequeueAfter: time.Duration(30) * time.Second,
}, nil
Expand Down
5 changes: 5 additions & 0 deletions controllers/kafkatopic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/banzaicloud/koperator/pkg/k8sutil"
"github.com/banzaicloud/koperator/pkg/kafkaclient"
"github.com/banzaicloud/koperator/pkg/util"
"github.com/banzaicloud/koperator/pkg/webhooks"
)

var topicFinalizer = "finalizer.kafkatopics.kafka.banzaicloud.io"
Expand Down Expand Up @@ -216,6 +217,10 @@ func (r *KafkaTopicReconciler) finalizeKafkaTopic(reqLogger logr.Logger, broker
return err
}
if exists != nil {
// When the topic is not managed by the Koperator then not our responsibility to delete it.
if val, ok := topic.GetLabels()[webhooks.ManagedByAnnotationKey]; ok && val != webhooks.ManagedByAnnotationValue {
return nil
}
// DeleteTopic with wait to make sure it goes down fully in case of cluster
// deletion.
// TODO (tinyzimmer): Perhaps this should only wait when it's the cluster
Expand Down
6 changes: 3 additions & 3 deletions controllers/kafkauser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R

user, err := pkiManager.ReconcileUserCertificate(ctx, instance, r.Scheme, cluster.Spec.GetKubernetesClusterDomain())
if err != nil {
switch errors.Cause(err).(type) {
case errorfactory.ResourceNotReady:
switch {
case errors.As(err, &errorfactory.ResourceNotReady{}):
reqLogger.Info("generated secret not found, may not be ready")
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Duration(5) * time.Second,
}, nil
case errorfactory.FatalReconcileError:
case errors.As(err, &errorfactory.FatalReconcileError{}):
// TODO: (tinyzimmer) - Sleep for longer for now to give user time to see the error
// But really we should catch these kinds of issues in a pre-admission hook in a future PR
// The user can fix while this is looping and it will pick it up next reconcile attempt
Expand Down
13 changes: 3 additions & 10 deletions docs/benchmarks/infrastructure/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,13 @@ spec:
num.io.threads=16
num.network.threads=6
kafkaHeapOpts: "-Xmx10G -Xms10G"
resourceReqs:
resourceRequirements:
limits:
memory: "16Gi"
cpu: "7500m"
requests:
memory: "12Gi"
cpu: "4000m"
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: nodepool.banzaicloud.io/name
operator: In
values:
- pool2
storageConfigs:
- mountPath: "/kafka-logs"
pvcSpec:
Expand Down Expand Up @@ -106,6 +98,7 @@ spec:
metric.sampler.partition.assignor.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor
# The metric sampling interval in milliseconds
metric.sampling.interval.ms=120000
metric.anomaly.detection.interval.ms=180000
# The partition metrics window size in milliseconds
partition.metrics.window.ms=300000
# The number of partition metric windows to keep in memory
Expand Down Expand Up @@ -264,7 +257,7 @@ spec:
}
]
}
clusterConfigs: |
clusterConfig: |
{
"min.insync.replicas": 3
}
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ require (
github.com/banzaicloud/istio-client-go v0.0.17
github.com/banzaicloud/istio-operator/api/v2 v2.15.1
github.com/banzaicloud/k8s-objectmatcher v1.8.0
github.com/banzaicloud/koperator/api v0.0.0
github.com/banzaicloud/koperator/properties v0.0.0
github.com/banzaicloud/koperator/api v0.23.0
github.com/banzaicloud/koperator/properties v0.4.1
github.com/cert-manager/cert-manager v1.9.1
github.com/cisco-open/cluster-registry-controller/api v0.2.5
github.com/envoyproxy/go-control-plane v0.10.3
Expand Down Expand Up @@ -134,8 +134,6 @@ require (
)

replace (
github.com/banzaicloud/koperator/api => ./api
github.com/banzaicloud/koperator/properties => ./properties
github.com/gogo/protobuf => github.com/waynz0r/protobuf v1.3.3-0.20210811122234-64636cae0910
github.com/golang/protobuf => github.com/luciferinlove/protobuf v0.0.0-20220913214010-c63936d75066
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ github.com/banzaicloud/istio-operator/api/v2 v2.15.1 h1:BZg8COvoOJtfx/dgN7KpoOnc
github.com/banzaicloud/istio-operator/api/v2 v2.15.1/go.mod h1:5qCpwWlIfxiLvBfTvT2mD2wp5RlFCDEt8Xql4sYPNBc=
github.com/banzaicloud/k8s-objectmatcher v1.8.0 h1:Nugn25elKtPMTA2br+JgHNeSQ04sc05MDPmpJnd1N2A=
github.com/banzaicloud/k8s-objectmatcher v1.8.0/go.mod h1:p2LSNAjlECf07fbhDyebTkPUIYnU05G+WfGgkTmgeMg=
github.com/banzaicloud/koperator/api v0.23.0 h1:WQYmDvboA6VNRSFoGJTi/fjL+hEjdM/sBtP32ymKrfg=
github.com/banzaicloud/koperator/api v0.23.0/go.mod h1:VnxVrXvw0QdlvJzke5ZFr+x+RD4K6zhjHXUdBO+iD/Q=
github.com/banzaicloud/koperator/properties v0.4.1 h1:SB2QgXlcK1Dc7Z1rg65PJifErDa8OQnoWCCJgmC7SGc=
github.com/banzaicloud/koperator/properties v0.4.1/go.mod h1:TcL+llxuhW3UeQtVEDYEXGouFLF2P+LuZZVudSb6jyA=
github.com/banzaicloud/operator-tools v0.28.0 h1:GSfc0qZr6zo7WrNxdgWZE1LcTChPU8QFYOTDirYVtIM=
github.com/banzaicloud/operator-tools v0.28.0/go.mod h1:t0dyFGJUR9Q5CwsUcq1nDJC0wSZqeh6nzUZkUp3vCXg=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
Expand Down
Loading