Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NodePool support to Cluster resource #188

Merged
merged 35 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6e65242
operator v1: Make featuregate EmptySeedStartCluster mandatory
birdayz Oct 7, 2024
f58a790
operator v1: do not swallow errors in ar.Ensure
birdayz Oct 7, 2024
3c32a22
operator v1: improve ghost brokers flag
birdayz Oct 7, 2024
eb973ca
operator v1: remove omitempty from NodePoolStatus replicas fields
birdayz Oct 7, 2024
83b4672
operator v1: do not allow downscale before upscale finished
birdayz Oct 7, 2024
caf75da
operator v1: implement support for multiple nodepools
birdayz Oct 7, 2024
d83ad01
operator v1: remove tests that check for deprecated RP features
birdayz Oct 7, 2024
2e08b7e
operator v1: try to fix flakiness of tests
birdayz Oct 8, 2024
cc04cf9
operator v1: support deleted nodepool in GetReplicas
birdayz Oct 8, 2024
59d6f7a
operator v1: use lower memory in tests
birdayz Oct 8, 2024
76be288
operator v1: set readiness probe timeout of 5s
birdayz Oct 9, 2024
ecde96b
operator v1: minor improvements to tests
birdayz Oct 9, 2024
2a3bfad
operator v1: rename GetReplicas to SumNodePoolReplicas
birdayz Oct 25, 2024
031fb27
operator v1: use constant for shadow-index-cache and datadir
birdayz Oct 25, 2024
76adb6f
operator v1: use provided context
birdayz Oct 25, 2024
40de1f7
operator v1: refactor package structure to avoid conflicts
birdayz Oct 25, 2024
0a02f35
operator v1: add client selector to list STS
birdayz Oct 25, 2024
cce6245
operator v1: improve splitting of NodePool part of STS name
birdayz Oct 25, 2024
8d1070d
operator v1: minor code review comments
birdayz Oct 25, 2024
1c9294f
operator v1: use NodePool AdditionalCommandlineArguments
birdayz Oct 28, 2024
c5056c3
operator v1: do not validate nodePools if replicas is null
birdayz Oct 31, 2024
2b15d46
operator v1: remove obsolete comment/log
birdayz Nov 6, 2024
1542f9b
operator v1: rename SumNodePoolReplicas to GetDesiredReplicas
birdayz Nov 8, 2024
ad85894
operator v1: change wording from primary to default
birdayz Nov 8, 2024
1db9f76
operator v1: add comment about package cycle
birdayz Nov 8, 2024
722a9fe
operator v1: remove extra newline
birdayz Nov 8, 2024
f963386
operator v1: add comment
birdayz Nov 8, 2024
7fce0e8
operator v1: stop using status for computing seed addresses
birdayz Nov 8, 2024
cdc53ac
operator v1: refactor getDecommissioningPod
birdayz Nov 8, 2024
f066d80
operator v1: remove extra newline
birdayz Nov 8, 2024
52eccd6
operator v1: change PodLabelNodeIDKey to PodNodeIDKey
birdayz Nov 8, 2024
1c1b057
operator v1: update comment
birdayz Nov 8, 2024
83ba5e2
operator v1: fix nodepools test assertions
birdayz Nov 11, 2024
9e08973
operator v1: add KUTTL test for deleting a nodepool
birdayz Nov 11, 2024
45b5f57
operator v1: improve readability of GetNodePools
birdayz Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 64 additions & 29 deletions operator/api/vectorized/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
"fmt"
"net"
"net/url"
"slices"
"time"

cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/redpanda-data/redpanda-operator/operator/pkg/resources/featuregates"
"k8s.io/utils/ptr"
)

const (
Expand All @@ -31,6 +31,8 @@ const (
ExternalListenerName = "kafka-external"
)

const DefaultNodePoolName = "default"

// RedpandaResourceRequirements extends corev1.ResourceRequirements
// to allow specification of resources directly passed to Redpanda that
// are different to Requests or Limits.
Expand Down Expand Up @@ -713,9 +715,9 @@ type LoadBalancerStatus struct {

// NodePoolStatus describes the status of the NodePool.
type NodePoolStatus struct {
CurrentReplicas int32 `json:"currentReplicas,omitempty"`
Replicas int32 `json:"replicas,omitempty"`
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
CurrentReplicas int32 `json:"currentReplicas"`
Replicas int32 `json:"replicas"`
ReadyReplicas int32 `json:"readyReplicas"`
// Indicates that a nodePool's pods are restarting.
// +optional
Restarting bool `json:"restarting"`
Expand Down Expand Up @@ -1214,36 +1216,31 @@ func (s *ClusterStatus) SetRestarting(restarting bool) {
s.DeprecatedUpgrading = restarting
}

// GetCurrentReplicas returns the current number of replicas that the controller wants to run.
// It returns 1 when not initialized (as fresh clusters start from 1 replica)
func (r *Cluster) GetCurrentReplicas() int32 {
if r == nil {
func (r *Cluster) GetDesiredReplicas() int32 {
nps := r.GetNodePoolsFromSpec()
if r == nil || len(nps) == 0 {
return 0
}
if r.Status.CurrentReplicas <= 0 {
// Not initialized, let's give the computed value
return r.ComputeInitialCurrentReplicasField()
}
return r.Status.CurrentReplicas
}

// ComputeInitialCurrentReplicasField calculates the initial value for status.currentReplicas.
//
// It needs to consider the following cases:
// - EmptySeedStartCluster is supported: we use spec.replicas as the starting point
// - Fresh cluster: we start from 1 replicas, then upscale if needed (initialization to bypass https://github.com/redpanda-data/redpanda/issues/333)
// - Existing clusters: we keep spec.replicas as starting point
func (r *Cluster) ComputeInitialCurrentReplicasField() int32 {
if r == nil {
return 0
replicas := int32(0)

var npNamesSeen []string

for i := range nps {
np := nps[i]
RafalKorepta marked this conversation as resolved.
Show resolved Hide resolved
replicas += ptr.Deref(np.Replicas, 0)
npNamesSeen = append(npNamesSeen, np.Name)
}
if r.Status.Replicas > 1 || r.Status.ReadyReplicas > 1 || len(r.Status.Nodes.Internal) > 1 || featuregates.EmptySeedStartCluster(r.Spec.Version) {
// A cluster seems to be already running, we start from the existing amount of replicas
return *r.Spec.Replicas

// We may have nodepools deleted from spec - but they are not yet fully deleted (in scale-down).
// Source these from the status instead.
for npName, npStatus := range r.Status.NodePools {
if !slices.Contains(npNamesSeen, npName) {
replicas += npStatus.Replicas
}
}

// Clusters start from a single replica, then upscale
return 1
return replicas
}

// TLSConfig is a generic TLS configuration
Expand Down Expand Up @@ -1422,3 +1419,41 @@ func (r *Cluster) GetDecommissionBrokerID() *int32 {
func (r *Cluster) SetDecommissionBrokerID(id *int32) {
r.Status.DecommissioningNode = id
}

// getNodePoolsFromSpec returns the NodePools defined in the spec.
// This contains the default NodePool (driven by spec.replicas for example), and the
// NodePools driven by the nodePools field.
func (r *Cluster) GetNodePoolsFromSpec() []NodePoolSpec {
out := make([]NodePoolSpec, 0)
if r.Spec.Replicas != nil {
defaultNodePool := NodePoolSpec{
Name: DefaultNodePoolName,
Replicas: r.Spec.Replicas,
Tolerations: r.Spec.Tolerations,
NodeSelector: r.Spec.NodeSelector,
Storage: r.Spec.Storage,
Resources: r.Spec.Resources,
AdditionalCommandlineArguments: r.Spec.Configuration.AdditionalCommandlineArguments,
}
if r.Spec.CloudStorage.CacheStorage != nil {
defaultNodePool.CloudCacheStorage = *r.Spec.CloudStorage.CacheStorage
}
out = append(out, defaultNodePool)
}
out = append(out, r.Spec.NodePools...)

return out
}

func (r *Cluster) CalculateCurrentReplicas() int32 {
var result int32
for _, np := range r.Status.NodePools {
result += np.CurrentReplicas
}
return result
}

type NodePoolSpecWithDeleted struct {
NodePoolSpec
Deleted bool
}
35 changes: 0 additions & 35 deletions operator/api/vectorized/v1alpha1/cluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/ptr"

"github.com/redpanda-data/redpanda-operator/operator/api/vectorized/v1alpha1"
"github.com/redpanda-data/redpanda-operator/operator/pkg/resources/featuregates"
)

//nolint:funlen // this is ok for a test
Expand Down Expand Up @@ -219,36 +217,3 @@ func TestConditions(t *testing.T) {
assert.Equal(t, condTime, cond2.LastTransitionTime)
})
}

func TestInitialReplicas(t *testing.T) {
// backward compatibility. Remove when v22.2 is no longer supported.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is obsolete, with the removal of support for clusters without the featuregate.

cluster := v1alpha1.Cluster{}
cluster.Spec.Version = featuregates.V22_2_1.String()
cluster.Spec.Replicas = ptr.To(int32(3))
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 0
cluster.Status.ReadyReplicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.ReadyReplicas = 0
cluster.Status.Nodes.Internal = []string{"1", "2"}
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Nodes.Internal = nil
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())

// test with latest version
cluster = v1alpha1.Cluster{}
cluster.Spec.Replicas = ptr.To(int32(3))
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 0
cluster.Status.ReadyReplicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.ReadyReplicas = 0
cluster.Status.Nodes.Internal = []string{"1", "2"}
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Nodes.Internal = nil
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
}
155 changes: 114 additions & 41 deletions operator/api/vectorized/v1alpha1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,19 @@ func (r *Cluster) SetupWebhookWithManager(mgr ctrl.Manager) error {

var _ webhook.Defaulter = &Cluster{}

func redpandaResourceFields(c *Cluster) redpandaResourceField {
return redpandaResourceField{&c.Spec.Resources, field.NewPath("spec").Child("resources")}
func redpandaResourceFields(c *Cluster) []redpandaResourceField {
var result []redpandaResourceField
// Default NodePool
if c.Spec.Replicas != nil {
result = append(result, redpandaResourceField{&c.Spec.Resources, field.NewPath("spec").Child("resources")})
}

// Additional NodePools
for _, np := range c.Spec.NodePools {
result = append(result, redpandaResourceField{&np.Resources, field.NewPath("spec").Child("nodePools").Child("resources")})
}

return result
}

func sidecarResourceFields(c *Cluster) []resourceField {
Expand Down Expand Up @@ -259,7 +270,9 @@ func (r *Cluster) validateCommon(log logr.Logger) field.ErrorList {
for _, rf := range sidecarResourceFields(r) {
allErrs = append(allErrs, r.validateResources(rf)...)
}
allErrs = append(allErrs, r.validateRedpandaResources(redpandaResourceFields(r))...)
for _, rf := range redpandaResourceFields(r) {
allErrs = append(allErrs, r.validateRedpandaResources(rf)...)
}
allErrs = append(allErrs, r.validateArchivalStorage()...)
allErrs = append(allErrs, r.validatePodDisruptionBudget()...)
if featuregates.InternalTopicReplication(r.Spec.Version) {
Expand All @@ -270,16 +283,13 @@ func (r *Cluster) validateCommon(log logr.Logger) field.ErrorList {
}

func (r *Cluster) validateScaling() field.ErrorList {
replicas := r.GetDesiredReplicas()

var allErrs field.ErrorList
if r.Spec.Replicas == nil {
if replicas <= 0 {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
"replicas must be specified explicitly"))
} else if *r.Spec.Replicas <= 0 {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
field.Invalid(field.NewPath("spec").Child("nodepools"),
replicas,
"downscaling is not allowed to less than 1 instance"))
}

Expand Down Expand Up @@ -770,26 +780,54 @@ func (r *Cluster) validateRedpandaMemory() field.ErrorList {
}
var allErrs field.ErrorList

// Ensure a requested 2GB of memory per core
requests := r.Spec.Resources.Requests.DeepCopy()
requests.Cpu().RoundUp(0)
requestedCores := requests.Cpu().Value()
if r.Spec.Resources.Requests.Memory().Value() < requestedCores*MinimumMemoryPerCore {
allErrs = append(allErrs,
field.Invalid(
field.NewPath("spec").Child("resources").Child("requests").Child("memory"),
r.Spec.Resources.Requests.Memory(),
"requests.memory < 2Gi per core; either decrease requests.cpu or increase requests.memory"))
for i := range r.Spec.NodePools {
np := r.Spec.NodePools[i]

// Ensure a requested 2GB of memory per core
requests := np.Resources.Requests.DeepCopy()
requests.Cpu().RoundUp(0)
requestedCores := requests.Cpu().Value()
if np.Resources.Requests.Memory().Value() < requestedCores*MinimumMemoryPerCore {
allErrs = append(allErrs,
field.Invalid(
field.NewPath("spec").Child("nodePool").Child("resources").Child("requests").Child("memory"),
np.Resources.Requests.Memory(),
"requests.memory < 2Gi per core; either decrease requests.cpu or increase requests.memory"))
}

redpandaCores := np.Resources.RedpandaCPU().Value()
minimumMemoryPerCore := int64(math.Floor(MinimumMemoryPerCore * RedpandaMemoryAllocationRatio))
if !np.Resources.RedpandaMemory().IsZero() && np.Resources.RedpandaMemory().Value() < redpandaCores*minimumMemoryPerCore {
allErrs = append(allErrs,
field.Invalid(
field.NewPath("spec").Child("resources").Child("redpanda").Child("memory"),
r.Spec.Resources.Requests.Memory(),
"redpanda.memory < 2Gi per core; either decrease redpanda.cpu or increase redpanda.memory"))
birdayz marked this conversation as resolved.
Show resolved Hide resolved
}
}

redpandaCores := r.Spec.Resources.RedpandaCPU().Value()
minimumMemoryPerCore := int64(math.Floor(MinimumMemoryPerCore * RedpandaMemoryAllocationRatio))
if !r.Spec.Resources.RedpandaMemory().IsZero() && r.Spec.Resources.RedpandaMemory().Value() < redpandaCores*minimumMemoryPerCore {
allErrs = append(allErrs,
field.Invalid(
field.NewPath("spec").Child("resources").Child("redpanda").Child("memory"),
r.Spec.Resources.Requests.Memory(),
"redpanda.memory < 2Gi per core; either decrease redpanda.cpu or increase redpanda.memory"))
if r.Spec.Replicas != nil {
// Ensure a requested 2GB of memory per core
requests := r.Spec.Resources.Requests.DeepCopy()
requests.Cpu().RoundUp(0)
requestedCores := requests.Cpu().Value()
if r.Spec.Resources.Requests.Memory().Value() < requestedCores*MinimumMemoryPerCore {
allErrs = append(allErrs,
field.Invalid(
field.NewPath("spec").Child("resources").Child("requests").Child("memory"),
r.Spec.Resources.Requests.Memory(),
"requests.memory < 2Gi per core; either decrease requests.cpu or increase requests.memory"))
}

redpandaCores := r.Spec.Resources.RedpandaCPU().Value()
minimumMemoryPerCore := int64(math.Floor(MinimumMemoryPerCore * RedpandaMemoryAllocationRatio))
if !r.Spec.Resources.RedpandaMemory().IsZero() && r.Spec.Resources.RedpandaMemory().Value() < redpandaCores*minimumMemoryPerCore {
allErrs = append(allErrs,
field.Invalid(
field.NewPath("spec").Child("resources").Child("redpanda").Child("memory"),
r.Spec.Resources.Requests.Memory(),
"redpanda.memory < 2Gi per core; either decrease redpanda.cpu or increase redpanda.memory"))
}
}

return allErrs
Expand All @@ -804,19 +842,54 @@ func (r *Cluster) validateRedpandaCoreChanges(old *Cluster) field.ErrorList {
}
var allErrs field.ErrorList

oldCPURequest := old.Spec.Resources.RedpandaCPU()
newCPURequest := r.Spec.Resources.RedpandaCPU()
if oldCPURequest != nil && newCPURequest != nil {
oldCores := oldCPURequest.Value()
newCores := newCPURequest.Value()
if r.Spec.Replicas != nil {
oldCPURequest := old.Spec.Resources.RedpandaCPU()
newCPURequest := r.Spec.Resources.RedpandaCPU()
if oldCPURequest != nil && newCPURequest != nil {
oldCores := oldCPURequest.Value()
newCores := newCPURequest.Value()

if newCores < oldCores {
minAllowedCPU := (oldCores-1)*1000 + 1
allErrs = append(allErrs,
field.Invalid(
field.NewPath("spec").Child("resources").Child("requests").Child("cpu"),
r.Spec.Resources.Requests.Cpu(),
fmt.Sprintf("CPU request must not be decreased; increase requests.cpu or redpanda.cpu to at least %dm", minAllowedCPU)))
if newCores < oldCores {
minAllowedCPU := (oldCores-1)*1000 + 1
allErrs = append(allErrs,
field.Invalid(
field.NewPath("spec").Child("resources").Child("requests").Child("cpu"),
r.Spec.Resources.Requests.Cpu(),
fmt.Sprintf("CPU request must not be decreased; increase requests.cpu or redpanda.cpu to at least %dm", minAllowedCPU)))
}
}
}

// Same check per NP
for i := range r.Spec.NodePools {
np := r.Spec.NodePools[i]
var oldNp *NodePoolSpec
for y := range old.Spec.NodePools {
old := old.Spec.NodePools[y]
if old.Name == np.Name {
oldNp = &old
}
}

if oldNp == nil {
continue
}

oldCPURequest := oldNp.Resources.RedpandaCPU()
newCPURequest := np.Resources.RedpandaCPU()

if oldCPURequest != nil && newCPURequest != nil {
oldCores := oldCPURequest.Value()
newCores := newCPURequest.Value()

if newCores < oldCores {
minAllowedCPU := (oldCores-1)*1000 + 1
allErrs = append(allErrs,
field.Invalid(
field.NewPath("spec").Child("resources").Child("requests").Child("cpu"),
np.Resources.Requests.Cpu(),
fmt.Sprintf("CPU request must not be decreased; increase requests.cpu or redpanda.cpu to at least %dm", minAllowedCPU)))
}
}
}

Expand Down
Loading