Skip to content

Commit

Permalink
v2: implement post-install/upgrade job in controller
Browse files Browse the repository at this point in the history
Prior to this commit the "defluxed" operator would not reconcile the cluster
configuration as it skips over helm hooks.

This commit adds support for cluster configuration reconciliation from the
redpanda controller itself, reusing as much of the job functionality as
possible.
  • Loading branch information
chrisseto committed Nov 8, 2024
1 parent d329b7d commit a1865c2
Show file tree
Hide file tree
Showing 22 changed files with 786 additions and 145 deletions.
1 change: 0 additions & 1 deletion go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,6 @@ github.com/redpanda-data/common-go/rpadmin v0.1.7-0.20240916201938-8d748d9ac10b/
github.com/redpanda-data/helm-charts v0.0.0-20240911060052-2bf9dd6f0996/go.mod h1:uEMmuH+gTppAsZZNYlUbh6tuxN3fqffWY0Bi8AcE2Zk=
github.com/redpanda-data/helm-charts v0.0.0-20240916201426-9ca3b128bb8e/go.mod h1:uEMmuH+gTppAsZZNYlUbh6tuxN3fqffWY0Bi8AcE2Zk=
github.com/redpanda-data/helm-charts v0.0.0-20241025092026-69353dfce9a1/go.mod h1:dmmGZo7DuHNnCy0QOykXN2sY9QI8kbdlkSKgIkCT978=
github.com/redpanda-data/helm-charts v0.0.0-20241030170802-ad1edfc56b70/go.mod h1:dmmGZo7DuHNnCy0QOykXN2sY9QI8kbdlkSKgIkCT978=
github.com/redpanda-data/redpanda/src/go/rpk v0.0.0-20240105044330-c094966ca0cf/go.mod h1:SaSp5/JwdLHu8ZU82wFbXD8/oE4UWB+8ZkjWWreAt7Y=
github.com/rhnvrm/simples3 v0.6.1 h1:H0DJwybR6ryQE+Odi9eqkHuzjYAeJgtGcGtuBwOhsH8=
github.com/rickb777/period v1.0.6 h1:f4TcHBtL/4qa4D44eqgxs7785/kfLKUjRI7XYI2HCvk=
Expand Down
27 changes: 21 additions & 6 deletions operator/api/redpanda/v1alpha2/redpanda_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"encoding/json"
"fmt"

"github.com/cockroachdb/errors"
helmv2beta2 "github.com/fluxcd/helm-controller/api/v2beta2"
"github.com/fluxcd/pkg/apis/meta"
redpandachart "github.com/redpanda-data/helm-charts/charts/redpanda"
Expand All @@ -27,7 +28,13 @@ import (
"github.com/redpanda-data/redpanda-operator/operator/api/vectorized/v1alpha1"
)

var RedpandaChartRepository = "https://charts.redpanda.com/"
const (
RedpandaChartRepository = "https://charts.redpanda.com/"

// ClusterConfigSynced is a condition indicating whether or not the
// redpanda cluster's configuration is up to date with the desired config.
ClusterConfigSynced = "ClusterConfigSynced"
)

type ChartRef struct {
// Specifies the name of the chart to deploy.
Expand Down Expand Up @@ -179,11 +186,6 @@ func init() {
SchemeBuilder.Register(&Redpanda{}, &RedpandaList{})
}

// GetHelmRelease returns the namespace and name of the HelmRelease.
func (in *RedpandaStatus) GetHelmRelease() string {
return in.HelmRelease
}

func (in *Redpanda) GetHelmReleaseName() string {
return in.Name
}
Expand All @@ -206,6 +208,10 @@ func (in *Redpanda) ValuesJSON() (*apiextensionsv1.JSON, error) {
return values, nil
}

func (in *Redpanda) GenerationObserved() bool {
return in.Generation != 0 && in.Generation == in.Status.ObservedGeneration
}

// RedpandaReady registers a successful reconciliation of the given HelmRelease.
func RedpandaReady(rp *Redpanda) *Redpanda {
newCondition := metav1.Condition{
Expand Down Expand Up @@ -246,6 +252,15 @@ func (in *Redpanda) OwnerShipRefObj() metav1.OwnerReference {
}
}

func (in *Redpanda) GetValues() (redpandachart.Values, error) {
values, err := redpandachart.Chart.LoadValues(in.Spec.ClusterSpec)
if err != nil {
return redpandachart.Values{}, errors.WithStack(err)
}

return helmette.Unwrap[redpandachart.Values](values), nil
}

func (in *Redpanda) GetDot(restConfig *rest.Config) (*helmette.Dot, error) {
var values []byte
var partial redpandachart.PartialValues
Expand Down
159 changes: 110 additions & 49 deletions operator/cmd/syncclusterconfig/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package syncclusterconfig
import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -74,8 +75,16 @@ If present and not empty, the $%s environment variable will be set as the cluste
return err
}

// do conditional merge of users.txt and bootstrap.yaml
maybeMergeSuperusers(logger, clusterConfig, usersDirectoryPath)
usersTXTs, err := loadUsersFiles(ctx, usersDirectoryPath)
if err != nil {
return err
}

syncer := Syncer{Client: client}

if err := syncer.Sync(ctx, clusterConfig, usersTXTs); err != nil {
return err
}

// NB: remove must be an empty slice NOT nil.
result, err := client.PatchClusterConfig(ctx, clusterConfig, []string{})
Expand Down Expand Up @@ -137,45 +146,13 @@ func loadBoostrapYAML(path string) (map[string]any, error) {
return config, nil
}

// maybeMergeSuperusers merges the values found in users.txt into our superusers
// list found in our bootstrap.yaml. This only occurs if an entry for superusers
// actually exists in the bootstrap.yaml.
func maybeMergeSuperusers(logger logr.Logger, clusterConfig map[string]any, path string) {
if path == "" {
// we have no path to a users directory, so don't do anything
logger.Info("--users-directory not specified. Skipping superusers merge.")
return
}

superusersConfig, ok := clusterConfig[superusersEntry]
if !ok {
// we have no superusers configuration, so don't do anything
logger.Info("Configuration does not contain a 'superusers' entry. Skipping superusers merge.")
return
}

superusers, err := loadUsersFiles(logger, path)
if err != nil {
logger.Info(fmt.Sprintf("Error reading users directory %q: %v. Skipping superusers merge.", path, err))
return
}

superusersAny, ok := superusersConfig.([]any)
if !ok {
logger.Info(fmt.Sprintf("Unable to cast superusers entry to array. Skipping superusers merge. Type is: %T", superusersConfig))
return
}

clusterConfig[superusersEntry] = normalizeSuperusers(append(superusers, mapConvertibleTo[string](logger, superusersAny)...))
}

func loadUsersFiles(logger logr.Logger, path string) ([]string, error) {
func loadUsersFiles(ctx context.Context, path string) (map[string][]byte, error) {
files, err := os.ReadDir(path)
if err != nil {
return nil, err
}

users := []string{}
users := map[string][]byte{}

for _, file := range files {
if file.IsDir() {
Expand All @@ -186,27 +163,35 @@ func loadUsersFiles(logger logr.Logger, path string) ([]string, error) {

usersFile, err := os.ReadFile(filename)
if err != nil {
logger.Info(fmt.Sprintf("Cannot read user file %q: %v. Skipping.", filename, err))
log.FromContext(ctx).Info(fmt.Sprintf("Cannot read user file %q: %v. Skipping.", filename, err))
continue
}

scanner := bufio.NewScanner(bytes.NewReader(usersFile))
users[file.Name()] = usersFile
}

i := 0
for scanner.Scan() {
i++
return users, nil
}

line := scanner.Text()
tokens := strings.Split(line, ":")
if len(tokens) != 2 && len(tokens) != 3 {
logger.Info(fmt.Sprintf("Skipping malformatted line number %d in file %q", i, filename))
continue
}
users = append(users, tokens[0])
func loadUsersFile(ctx context.Context, filename string, usersFile []byte) []string {
scanner := bufio.NewScanner(bytes.NewReader(usersFile))

users := []string{}

i := 0
for scanner.Scan() {
i++

line := scanner.Text()
tokens := strings.Split(line, ":")
if len(tokens) != 2 && len(tokens) != 3 {
log.FromContext(ctx).Info(fmt.Sprintf("Skipping malformatted line number %d in file %q", i, filename))
continue
}
users = append(users, tokens[0])
}

return users, nil
return users
}

// normalizeSuperusers de-duplicates and sorts the superusers
Expand Down Expand Up @@ -240,3 +225,79 @@ func mapConvertibleTo[T any](logger logr.Logger, array []any) []T {

return converted
}

type Syncer struct {
Client *rpadmin.AdminAPI
}

func (s *Syncer) Sync(ctx context.Context, desired map[string]any, usersTXT map[string][]byte) error {
logger := log.FromContext(ctx)

s.maybeMergeSuperusers(ctx, desired, usersTXT)

current, err := s.Client.Config(ctx, false)
if err != nil {
return err
}

var added []string
var changed []string
// NB: toRemove MUST default to an empty array. Otherwise redpanda will reject our request.
removed := []string{}

// TODO: Uncomment this block to make cluster config syncing fully
// declarative. The historical behavior has not been, so this is
// technically a breaking change.
// for key := range current {
// if _, ok := desired[key]; !ok {
// removed = append(removed, key)
// }
// }

for key, value := range desired {
if currentValue, ok := current[key]; !ok {
added = append(added, key)
} else if value != currentValue {
changed = append(changed, key)
}
}

logger.Info("updating cluster config", "added", added, "removed", removed, "changed", changed, "config", desired)

result, err := s.Client.PatchClusterConfig(ctx, desired, removed)
if err != nil {
return err
}

logger.Info("updated cluster configuration", "config_version", result.ConfigVersion)

return nil
}

func (s *Syncer) maybeMergeSuperusers(ctx context.Context, config map[string]any, usersTXT map[string][]byte) {
logger := log.FromContext(ctx)

if len(usersTXT) == 0 {
logger.Info("usersTXT not specified or empty. Skipping superusers merge.")
}

superusers := []string{}
for name, content := range usersTXT {
superusers = append(superusers, loadUsersFile(ctx, name, content)...)
}

superusersConfig, ok := config[superusersEntry]
if !ok {
// we have no superusers configuration, so don't do anything
logger.Info("Configuration does not contain a 'superusers' entry. Skipping superusers merge.")
return
}

superusersAny, ok := superusersConfig.([]any)
if !ok {
logger.Info(fmt.Sprintf("Unable to cast superusers entry to array. Skipping superusers merge. Type is: %T", superusersConfig))
return
}

config[superusersEntry] = normalizeSuperusers(append(superusers, mapConvertibleTo[string](logger, superusersAny)...))
}
Loading

0 comments on commit a1865c2

Please sign in to comment.