Skip to content

Commit

Permalink
Merge branch 'master' into test/de2e-koperator-uninstall
Browse files Browse the repository at this point in the history
  • Loading branch information
bartam1 authored Jun 16, 2023
2 parents be64e9e + 367f4db commit c31e4dd
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 65 deletions.
20 changes: 19 additions & 1 deletion api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
DefaultServiceAccountName = "default"
// DefaultAnyCastPort kafka anycast port that can be used by clients for metadata queries
DefaultAnyCastPort = 29092
// DefaultIngressControllerTargetPort is the default container port for the ingress controller
DefaultIngressControllerTargetPort = 29092
// DefaultEnvoyHealthCheckPort envoy health check port
DefaultEnvoyHealthCheckPort = 8080
// DefaultEnvoyAdminPort envoy admin port
Expand Down Expand Up @@ -469,6 +471,15 @@ func (c ExternalListenerConfig) GetAnyCastPort() int32 {
return *c.AnyCastPort
}

// GetIngressControllerTargetPort returns the IngressControllerTargetPort if it is defined,
// otherwise it returns the DefaultIngressControllerTargetPort value
func (c ExternalListenerConfig) GetIngressControllerTargetPort() int32 {
if c.IngressControllerTargetPort == nil {
return DefaultIngressControllerTargetPort
}
return *c.IngressControllerTargetPort
}

// GetServiceAnnotations returns a copy of the ServiceAnnotations field.
func (c IngressServiceSettings) GetServiceAnnotations() map[string]string {
return util.CloneMap(c.ServiceAnnotations)
Expand Down Expand Up @@ -544,7 +555,14 @@ type ExternalListenerConfig struct {
// If accessMethod is Nodeport and externalStartingPort is set to 0 then the broker IDs are not added and the Nodeport port numbers will be chosen automatically by the K8s Service controller
ExternalStartingPort int32 `json:"externalStartingPort"`
// configuring AnyCastPort allows kafka cluster access without specifying the exact broker
// If not defined, 29092 will be used for external clients to reach the kafka cluster
AnyCastPort *int32 `json:"anyCastPort,omitempty"`
// +kubebuilder:validation:Minimum=1024
// +kubebuilder:validation:Maximum=65535
// +optional
// IngressControllerTargetPort defines the container port that the ingress controller uses for handling external traffic.
// If not defined, 29092 will be used as the default IngressControllerTargetPort value.
IngressControllerTargetPort *int32 `json:"ingressControllerTargetPort,omitempty"`
// +kubebuilder:validation:Enum=LoadBalancer;NodePort
// accessMethod defines the method which the external listener is exposed through.
// Two types are supported LoadBalancer and NodePort.
Expand All @@ -553,7 +571,7 @@ type ExternalListenerConfig struct {
// +optional
AccessMethod corev1.ServiceType `json:"accessMethod,omitempty"`
// Config allows to specify ingress controller configuration per external listener
// if set overrides the the default `KafkaClusterSpec.IstioIngressConfig` or `KafkaClusterSpec.EnvoyConfig` for this external listener.
// if set, it overrides the default `KafkaClusterSpec.IstioIngressConfig` or `KafkaClusterSpec.EnvoyConfig` for this external listener.
// +optional
Config *Config `json:"config,omitempty"`
}
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19078,12 +19078,14 @@ spec:
type: string
anyCastPort:
description: configuring AnyCastPort allows kafka cluster
access without specifying the exact broker
access without specifying the exact broker If not defined,
29092 will be used for external clients to reach the kafka
cluster
format: int32
type: integer
config:
description: Config allows to specify ingress controller
configuration per external listener if set overrides the
configuration per external listener if set, it overrides
the default `KafkaClusterSpec.IstioIngressConfig` or `KafkaClusterSpec.EnvoyConfig`
for this external listener.
properties:
Expand Down Expand Up @@ -21473,6 +21475,15 @@ spec:
public IP (see "brokerConfig.nodePortExternalIP") is advertised
on the address having the following format: <kafka-cluster-name>-<broker-id>.<namespace><value-specified-in-hostnameOverride-field>'
type: string
ingressControllerTargetPort:
description: IngressControllerTargetPort defines the container
port that the ingress controller uses for handling external
traffic. If not defined, 29092 will be used as the default
IngressControllerTargetPort value.
format: int32
maximum: 65535
minimum: 1024
type: integer
name:
pattern: ^[a-z0-9\-]+
type: string
Expand Down
15 changes: 13 additions & 2 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18915,12 +18915,14 @@ spec:
type: string
anyCastPort:
description: configuring AnyCastPort allows kafka cluster
access without specifying the exact broker
access without specifying the exact broker If not defined,
29092 will be used for external clients to reach the kafka
cluster
format: int32
type: integer
config:
description: Config allows to specify ingress controller
configuration per external listener if set overrides the
configuration per external listener if set, it overrides
the default `KafkaClusterSpec.IstioIngressConfig` or `KafkaClusterSpec.EnvoyConfig`
for this external listener.
properties:
Expand Down Expand Up @@ -21310,6 +21312,15 @@ spec:
public IP (see "brokerConfig.nodePortExternalIP") is advertised
on the address having the following format: <kafka-cluster-name>-<broker-id>.<namespace><value-specified-in-hostnameOverride-field>'
type: string
ingressControllerTargetPort:
description: IngressControllerTargetPort defines the container
port that the ingress controller uses for handling external
traffic. If not defined, 29092 will be used as the default
IngressControllerTargetPort value.
format: int32
maximum: 65535
minimum: 1024
type: integer
name:
pattern: ^[a-z0-9\-]+
type: string
Expand Down
137 changes: 130 additions & 7 deletions controllers/cruisecontroloperation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"sort"
"strings"
"time"

"emperror.dev/errors"
Expand All @@ -34,19 +35,21 @@ import (

"github.com/banzaicloud/go-cruise-control/pkg/types"

apiutil "github.com/banzaicloud/koperator/api/util"
banzaiv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1"
banzaiv1beta1 "github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/scale"
"github.com/banzaicloud/koperator/pkg/util"
)

const (
defaultFailedTasksHistoryMaxLength = 50
ccOperationFinalizerGroup = "finalizer.cruisecontroloperations.kafka.banzaicloud.io"
ccOperationForStopExecution = "ccOperationStopExecution"
ccOperationFirstExecution = "ccOperationFirstExecution"
ccOperationRetryExecution = "ccOperationRetryExecution"
ccOperationInProgress = "ccOperationInProgress"
defaultFailedTasksHistoryMaxLength = 50
ccOperationFinalizerGroup = "finalizer.cruisecontroloperations.kafka.banzaicloud.io"
ccOperationForStopExecution = "ccOperationStopExecution"
ccOperationFirstExecution = "ccOperationFirstExecution"
ccOperationRetryExecution = "ccOperationRetryExecution"
ccOperationInProgress = "ccOperationInProgress"
defaultCruiseControlStatusOperationMaxDuration = time.Duration(5) * time.Minute
)

var (
Expand Down Expand Up @@ -97,6 +100,12 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques
return reconciled()
}

// Skip reconciliation for Cruise Control Status operation
if currentCCOperation.CurrentTaskOperation() == banzaiv1alpha1.OperationStatus {
log.V(1).Info("skipping reconciliation for Cruise Control Status operation")
return reconciled()
}

// When the task is done we can remove the finalizer instantly thus we can return fast here.
if isFinalizerNeeded(currentCCOperation) && currentCCOperation.IsDone() {
controllerutil.RemoveFinalizer(currentCCOperation, ccOperationFinalizerGroup)
Expand Down Expand Up @@ -144,7 +153,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques
}

// Checking Cruise Control health
status, err := r.scaler.Status(ctx)
status, err := r.getStatus(ctx, log, kafkaCluster, ccOperationListClusterWide)
if err != nil {
log.Error(err, "could not get Cruise Control status")
return requeueAfter(defaultRequeueIntervalInSeconds)
Expand Down Expand Up @@ -470,6 +479,120 @@ func (r *CruiseControlOperationReconciler) updateCurrentTasks(ctx context.Contex
return nil
}

// getStatus returns the internal state of Cruise Control.
//
// The logic is the following:
// - If the Cruise Control makes the Status request sync, then the result will be returned.
// - If the Cruise Control makes the Status request async, then a new Status CruiseControlOperation
// will be created and an error will be returned to indicate that Cruise Control is not ready yet.
// - If there is already a Status CruiseControlOperation in progress, then it will be updated and
// the result will be returned.
func (r *CruiseControlOperationReconciler) getStatus(
ctx context.Context,
log logr.Logger,
kafkaCluster *banzaiv1beta1.KafkaCluster,
ccOperationListClusterWide banzaiv1alpha1.CruiseControlOperationList,
) (scale.CruiseControlStatus, error) {
var statusOperation *banzaiv1alpha1.CruiseControlOperation
for i := range ccOperationListClusterWide.Items {
ccOperation := &ccOperationListClusterWide.Items[i]
// ignoring the error here to continue processing the operations,
// even if the user does not provide a KafkaClusterRef label on the CCOperation then the ref will be an empty object (not nil) and the filter will skip it.
ref, _ := kafkaClusterReference(ccOperation)
if ref.Name == kafkaCluster.Name && ref.Namespace == kafkaCluster.Namespace &&
ccOperation.CurrentTaskOperation() == banzaiv1alpha1.OperationStatus && ccOperation.IsCurrentTaskRunning() {
statusOperation = ccOperation
break
}
}

if statusOperation != nil {
res, err := r.scaler.StatusTask(ctx, statusOperation.CurrentTaskID())
if err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not get the latest state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace())
}
if err := updateResult(log, res.TaskResult, statusOperation, false); err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace())
}

err = r.Status().Update(ctx, statusOperation)
if err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace())
}

if statusOperation.CurrentTask().Finished != nil &&
statusOperation.CurrentTask().Finished.Time.Sub(statusOperation.CurrentTask().Started.Time) > defaultCruiseControlStatusOperationMaxDuration {
return scale.CruiseControlStatus{}, errors.New("the Cruise Control status operation took too long to finish")
}

if res.Status == nil {
return scale.CruiseControlStatus{}, errors.New("could not get Cruise Control status")
}

return *res.Status, nil
}

res, err := r.scaler.Status(ctx)
if err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not get Cruise Control status")
}

if res.Status == nil {
operationTTLSecondsAfterFinished := kafkaCluster.Spec.CruiseControlConfig.CruiseControlOperationSpec.GetTTLSecondsAfterFinished()
operation, err := r.createCCOperation(ctx, kafkaCluster, operationTTLSecondsAfterFinished, banzaiv1alpha1.OperationStatus)
if err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not create a new Status CruiseControlOperation")
}
if err = updateResult(log, res.TaskResult, operation, true); err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace())
}
err = r.Status().Update(ctx, operation)
if err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace())
}

return scale.CruiseControlStatus{}, errors.New("could not get Cruise Control status, the operation is still in progress")
}

return *res.Status, nil
}

func (r *CruiseControlOperationReconciler) createCCOperation(
ctx context.Context,
kafkaCluster *banzaiv1beta1.KafkaCluster,
ttlSecondsAfterFinished *int,
operationType banzaiv1alpha1.CruiseControlTaskOperation,
) (*banzaiv1alpha1.CruiseControlOperation, error) {
operation := &banzaiv1alpha1.CruiseControlOperation{
ObjectMeta: v1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-%s-", kafkaCluster.Name, strings.ReplaceAll(string(operationType), "_", "")),
Namespace: kafkaCluster.Namespace,
Labels: apiutil.LabelsForKafka(kafkaCluster.Name),
},
}

if ttlSecondsAfterFinished != nil {
operation.Spec.TTLSecondsAfterFinished = ttlSecondsAfterFinished
}

if err := controllerutil.SetControllerReference(kafkaCluster, operation, r.Scheme); err != nil {
return nil, err
}
if err := r.Client.Create(ctx, operation); err != nil {
return nil, err
}

operation.Status.CurrentTask = &banzaiv1alpha1.CruiseControlTask{
Operation: operationType,
}

if err := r.Status().Update(ctx, operation); err != nil {
return nil, err
}

return operation, nil
}

func isWaitingForFinalization(ccOperation *banzaiv1alpha1.CruiseControlOperation) bool {
return ccOperation.IsCurrentTaskRunning() && !ccOperation.ObjectMeta.DeletionTimestamp.IsZero() && controllerutil.ContainsFinalizer(ccOperation, ccOperationFinalizerGroup)
}
Expand Down
Loading

0 comments on commit c31e4dd

Please sign in to comment.