Skip to content

Commit

Permalink
Merge pull request #4311 from fluxcd/kstatus-readiness
Browse files Browse the repository at this point in the history
Check readiness of Flux kinds using kstatus
  • Loading branch information
darkowlzz authored Dec 8, 2023
2 parents 8e3a809 + 05c13fe commit 6135c32
Show file tree
Hide file tree
Showing 23 changed files with 464 additions and 420 deletions.
2 changes: 1 addition & 1 deletion cmd/flux/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (names apiType) upsertAndWait(object upsertWaitable, mutate func() error) e

logger.Waitingf("waiting for %s reconciliation", names.kind)
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isReady(kubeClient, namespacedName, object)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, object.asClientObject())); err != nil {
return err
}
logger.Successf("%s reconciliation completed", names.kind)
Expand Down
22 changes: 1 addition & 21 deletions cmd/flux/create_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -133,7 +132,7 @@ func createAlertCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Alert reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isAlertReady(kubeClient, namespacedName, &alert)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &alert)); err != nil {
return err
}
logger.Successf("Alert %s is ready", name)
Expand Down Expand Up @@ -170,22 +169,3 @@ func upsertAlert(ctx context.Context, kubeClient client.Client,
logger.Successf("Alert updated")
return namespacedName, nil
}

func isAlertReady(kubeClient client.Client, namespacedName types.NamespacedName, alert *notificationv1b2.Alert) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, alert)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(alert.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
22 changes: 1 addition & 21 deletions cmd/flux/create_alertprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -128,7 +127,7 @@ func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Provider reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isAlertProviderReady(kubeClient, namespacedName, &provider)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &provider)); err != nil {
return err
}

Expand Down Expand Up @@ -167,22 +166,3 @@ func upsertAlertProvider(ctx context.Context, kubeClient client.Client,
logger.Successf("Provider updated")
return namespacedName, nil
}

func isAlertProviderReady(kubeClient client.Client, namespacedName types.NamespacedName, provider *notificationv1.Provider) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, provider)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(provider.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
19 changes: 1 addition & 18 deletions cmd/flux/create_helmrelease.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/spf13/cobra"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -304,7 +303,7 @@ func createHelmReleaseCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for HelmRelease reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isHelmReleaseReady(kubeClient, namespacedName, &helmRelease)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &helmRelease)); err != nil {
return err
}
logger.Successf("HelmRelease %s is ready", name)
Expand Down Expand Up @@ -344,22 +343,6 @@ func upsertHelmRelease(ctx context.Context, kubeClient client.Client,
return namespacedName, nil
}

func isHelmReleaseReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, helmRelease)
if err != nil {
return false, err
}

// Confirm the state we are observing is for the current generation
if helmRelease.Generation != helmRelease.Status.ObservedGeneration {
return false, nil
}

return apimeta.IsStatusConditionTrue(helmRelease.Status.Conditions, meta.ReadyCondition), nil
}
}

func validateStrategy(input string) bool {
allowedStrategy := []string{"Revision", "ChartVersion"}

Expand Down
27 changes: 1 addition & 26 deletions cmd/flux/create_kustomization.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -264,7 +263,7 @@ func createKsCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Kustomization reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isKustomizationReady(kubeClient, namespacedName, &kustomization)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &kustomization)); err != nil {
return err
}
logger.Successf("Kustomization %s is ready", name)
Expand Down Expand Up @@ -303,27 +302,3 @@ func upsertKustomization(ctx context.Context, kubeClient client.Client,
logger.Successf("Kustomization updated")
return namespacedName, nil
}

func isKustomizationReady(kubeClient client.Client, namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, kustomization)
if err != nil {
return false, err
}

// Confirm the state we are observing is for the current generation
if kustomization.Generation != kustomization.Status.ObservedGeneration {
return false, nil
}

if c := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
22 changes: 1 addition & 21 deletions cmd/flux/create_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -140,7 +139,7 @@ func createReceiverCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Receiver reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isReceiverReady(kubeClient, namespacedName, &receiver)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &receiver)); err != nil {
return err
}
logger.Successf("Receiver %s is ready", name)
Expand Down Expand Up @@ -179,22 +178,3 @@ func upsertReceiver(ctx context.Context, kubeClient client.Client,
logger.Successf("Receiver updated")
return namespacedName, nil
}

func isReceiverReady(kubeClient client.Client, namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, receiver)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(receiver.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
29 changes: 1 addition & 28 deletions cmd/flux/create_source_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"

Expand Down Expand Up @@ -205,7 +204,7 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Bucket source reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isBucketReady(kubeClient, namespacedName, bucket)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, bucket)); err != nil {
return err
}
logger.Successf("Bucket source reconciliation completed")
Expand Down Expand Up @@ -247,29 +246,3 @@ func upsertBucket(ctx context.Context, kubeClient client.Client,
logger.Successf("Bucket source updated")
return namespacedName, nil
}

func isBucketReady(kubeClient client.Client, namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, bucket)
if err != nil {
return false, err
}

if c := conditions.Get(bucket, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != bucket.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
29 changes: 1 addition & 28 deletions cmd/flux/create_source_git.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"sigs.k8s.io/yaml"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"

sourcev1 "github.com/fluxcd/source-controller/api/v1"

Expand Down Expand Up @@ -326,7 +325,7 @@ func createSourceGitCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for GitRepository source reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isGitRepositoryReady(kubeClient, namespacedName, &gitRepository)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &gitRepository)); err != nil {
return err
}
logger.Successf("GitRepository source reconciliation completed")
Expand Down Expand Up @@ -368,29 +367,3 @@ func upsertGitRepository(ctx context.Context, kubeClient client.Client,
logger.Successf("GitRepository source updated")
return namespacedName, nil
}

func isGitRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, gitRepository *sourcev1.GitRepository) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, gitRepository)
if err != nil {
return false, err
}

if c := conditions.Get(gitRepository, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != gitRepository.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
11 changes: 11 additions & 0 deletions cmd/flux/create_source_git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,21 @@ func TestCreateSourceGit(t *testing.T) {
Time: time.Now(),
},
}
repo.Status.ObservedGeneration = repo.GetGeneration()
},
}, {
"Failed",
command,
assertError("failed message"),
func(repo *sourcev1.GitRepository) {
stalledCondition := metav1.Condition{
Type: meta.StalledCondition,
Status: metav1.ConditionTrue,
Reason: sourcev1.URLInvalidReason,
Message: "failed message",
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, stalledCondition)
newCondition := metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionFalse,
Expand All @@ -195,6 +204,7 @@ func TestCreateSourceGit(t *testing.T) {
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition)
repo.Status.ObservedGeneration = repo.GetGeneration()
},
}, {
"NoArtifact",
Expand All @@ -210,6 +220,7 @@ func TestCreateSourceGit(t *testing.T) {
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition)
repo.Status.ObservedGeneration = repo.GetGeneration()
},
},
}
Expand Down
29 changes: 1 addition & 28 deletions cmd/flux/create_source_helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -232,7 +231,7 @@ func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for HelmRepository source reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isHelmRepositoryReady(kubeClient, namespacedName, helmRepository)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, helmRepository)); err != nil {
return err
}
logger.Successf("HelmRepository source reconciliation completed")
Expand Down Expand Up @@ -279,29 +278,3 @@ func upsertHelmRepository(ctx context.Context, kubeClient client.Client,
logger.Successf("source updated")
return namespacedName, nil
}

func isHelmRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRepository *sourcev1.HelmRepository) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, helmRepository)
if err != nil {
return false, err
}

if c := conditions.Get(helmRepository, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != helmRepository.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
Loading

0 comments on commit 6135c32

Please sign in to comment.