Skip to content

Commit

Permalink
feat: wait using kstatus (zarf-dev#3043)
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Abro <[email protected]>
Signed-off-by: ittacco <[email protected]>
  • Loading branch information
AustinAbro321 authored and ittacco committed Oct 18, 2024
1 parent 0de7dc8 commit 3b1ff1d
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 121 deletions.
2 changes: 2 additions & 0 deletions site/src/content/docs/ref/deploy.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ Deployments will wait for helm [post-install hooks](https://helm.sh/docs/topics/

:::

After the Helm wait completes successfully, Zarf waits for all resources in the applied chart to fully reconcile. To identify when reconciliation is achieved, Zarf uses [kstatus](https://github.com/kubernetes-sigs/cli-utils/blob/master/pkg/kstatus/README.md#kstatus). Kstatus assesses whether a resource is reconciled by checking the [status](https://kubernetes.io/docs/concepts/overview/working-with-objects/#object-spec-and-status) field. If a resource does not have a status field, kstatus considers it reconciled once it's found.

### Timeout Settings

The default timeout for Helm operations in Zarf is 15 minutes.
Expand Down
40 changes: 40 additions & 0 deletions src/internal/healthchecks/healthchecks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: 2021-Present The Zarf Authors

// Package healthchecks run kstatus style health checks on a list of objects
package healthchecks

import (
"context"

pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
)

// Run waits for a list of objects to be reconciled
func Run(ctx context.Context, watcher watcher.StatusWatcher, healthChecks []v1alpha1.NamespacedObjectKindReference) error {
objs := []object.ObjMetadata{}
for _, hc := range healthChecks {
gv, err := schema.ParseGroupVersion(hc.APIVersion)
if err != nil {
return err
}
obj := object.ObjMetadata{
GroupKind: schema.GroupKind{
Group: gv.Group,
Kind: hc.Kind,
},
Namespace: hc.Namespace,
Name: hc.Name,
}
objs = append(objs, obj)
}
err := pkgkubernetes.WaitForReady(ctx, watcher, objs)
if err != nil {
return err
}
return nil
}
96 changes: 96 additions & 0 deletions src/internal/healthchecks/healthchecks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: 2021-Present The Zarf Authors

// Package healthchecks run kstatus style health checks on a list of objects
package healthchecks

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/testutil"
)

var podCurrentYaml = `
apiVersion: v1
kind: Pod
metadata:
name: good-pod
namespace: ns
status:
conditions:
- type: Ready
status: "True"
phase: Running
`

var podYaml = `
apiVersion: v1
kind: Pod
metadata:
name: in-progress-pod
namespace: ns
`

func TestRunHealthChecks(t *testing.T) {
t.Parallel()
tests := []struct {
name string
podYaml string
expectErr error
}{
{
name: "Pod is running",
podYaml: podCurrentYaml,
expectErr: nil,
},
{
name: "Pod is never ready",
podYaml: podYaml,
expectErr: context.DeadlineExceeded,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
fakeMapper := testutil.NewFakeRESTMapper(
v1.SchemeGroupVersion.WithKind("Pod"),
)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
m := make(map[string]interface{})
err := yaml.Unmarshal([]byte(tt.podYaml), &m)
require.NoError(t, err)
pod := &unstructured.Unstructured{Object: m}
statusWatcher := watcher.NewDefaultStatusWatcher(fakeClient, fakeMapper)
podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
require.NoError(t, fakeClient.Tracker().Create(podGVR, pod, pod.GetNamespace()))
objs := []v1alpha1.NamespacedObjectKindReference{
{
APIVersion: pod.GetAPIVersion(),
Kind: pod.GetKind(),
Namespace: pod.GetNamespace(),
Name: pod.GetName(),
},
}
err = Run(ctx, statusWatcher, objs)
if tt.expectErr != nil {
require.ErrorIs(t, err, tt.expectErr)
return
}
require.NoError(t, err)
})
}
}
36 changes: 33 additions & 3 deletions src/internal/packager/helm/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package helm

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -24,7 +25,9 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/yaml"

"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/pkg/message"
"github.com/zarf-dev/zarf/src/types"
)
Expand Down Expand Up @@ -58,6 +61,10 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,
}

histClient := action.NewHistory(h.actionConfig)
var release *release.Release

helmCtx, helmCtxCancel := context.WithTimeout(ctx, h.timeout)
defer helmCtxCancel()

err = retry.Do(func() error {
var err error
Expand All @@ -70,16 +77,15 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,
// No prior release, try to install it.
spinner.Updatef("Attempting chart installation")

_, err = h.installChart(ctx, postRender)
release, err = h.installChart(helmCtx, postRender)
} else if histErr == nil && len(releases) > 0 {
// Otherwise, there is a prior release so upgrade it.
spinner.Updatef("Attempting chart upgrade")

lastRelease := releases[len(releases)-1]

_, err = h.upgradeChart(ctx, lastRelease, postRender)
release, err = h.upgradeChart(helmCtx, lastRelease, postRender)
} else {
// 😭 things aren't working
return fmt.Errorf("unable to verify the chart installation status: %w", histErr)
}

Expand Down Expand Up @@ -118,6 +124,30 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,
return nil, "", installErr
}

resourceList, err := h.actionConfig.KubeClient.Build(bytes.NewBufferString(release.Manifest), true)
if err != nil {
return nil, "", fmt.Errorf("unable to build the resource list: %w", err)
}

healthChecks := []v1alpha1.NamespacedObjectKindReference{}
for _, resource := range resourceList {
apiVersion, kind := resource.Object.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind()
healthChecks = append(healthChecks, v1alpha1.NamespacedObjectKindReference{
APIVersion: apiVersion,
Kind: kind,
Name: resource.Name,
Namespace: resource.Namespace,
})
}
if !h.chart.NoWait {
// Ensure we don't go past the timeout by using a context initialized with the helm timeout
spinner.Updatef("Running health checks")
if err := healthchecks.Run(helmCtx, h.cluster.Watcher, healthChecks); err != nil {
return nil, "", err
}
}
spinner.Success()

// return any collected connect strings for zarf connect.
return postRender.connectStrings, h.chart.ReleaseName, nil
}
Expand Down
31 changes: 2 additions & 29 deletions src/pkg/packager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@ import (
"golang.org/x/sync/errgroup"

"github.com/avast/retry-go/v4"
pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"

"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/internal/git"
"github.com/zarf-dev/zarf/src/internal/gitea"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/internal/packager/helm"
"github.com/zarf-dev/zarf/src/internal/packager/images"
"github.com/zarf-dev/zarf/src/internal/packager/template"
Expand Down Expand Up @@ -220,30 +217,6 @@ func (p *Packager) deployComponents(ctx context.Context) ([]types.DeployedCompon
return deployedComponents, nil
}

func runHealthChecks(ctx context.Context, watcher watcher.StatusWatcher, healthChecks []v1alpha1.NamespacedObjectKindReference) error {
objs := []object.ObjMetadata{}
for _, hc := range healthChecks {
gv, err := schema.ParseGroupVersion(hc.APIVersion)
if err != nil {
return err
}
obj := object.ObjMetadata{
GroupKind: schema.GroupKind{
Group: gv.Group,
Kind: hc.Kind,
},
Namespace: hc.Namespace,
Name: hc.Name,
}
objs = append(objs, obj)
}
err := pkgkubernetes.WaitForReady(ctx, watcher, objs)
if err != nil {
return err
}
return nil
}

func (p *Packager) deployInitComponent(ctx context.Context, component v1alpha1.ZarfComponent) ([]types.InstalledChart, error) {
hasExternalRegistry := p.cfg.InitOpts.RegistryInfo.Address != ""
isSeedRegistry := component.Name == "zarf-seed-registry"
Expand Down Expand Up @@ -385,7 +358,7 @@ func (p *Packager) deployComponent(ctx context.Context, component v1alpha1.ZarfC
defer cancel()
spinner := message.NewProgressSpinner("Running health checks")
defer spinner.Stop()
if err = runHealthChecks(healthCheckContext, p.cluster.Watcher, component.HealthChecks); err != nil {
if err = healthchecks.Run(healthCheckContext, p.cluster.Watcher, component.HealthChecks); err != nil {
return nil, fmt.Errorf("health checks failed: %w", err)
}
spinner.Success()
Expand Down
89 changes: 0 additions & 89 deletions src/pkg/packager/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,12 @@
package packager

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/pkg/packager/sources"
"github.com/zarf-dev/zarf/src/types"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/testutil"
)

func TestGenerateValuesOverrides(t *testing.T) {
Expand Down Expand Up @@ -282,82 +272,3 @@ func TestServiceInfoFromServiceURL(t *testing.T) {
})
}
}

var podCurrentYaml = `
apiVersion: v1
kind: Pod
metadata:
name: good-pod
namespace: ns
status:
conditions:
- type: Ready
status: "True"
phase: Running
`

var podYaml = `
apiVersion: v1
kind: Pod
metadata:
name: in-progress-pod
namespace: ns
`

func yamlToUnstructured(t *testing.T, yml string) *unstructured.Unstructured {
t.Helper()
m := make(map[string]interface{})
err := yaml.Unmarshal([]byte(yml), &m)
require.NoError(t, err)
return &unstructured.Unstructured{Object: m}
}

func TestRunHealthChecks(t *testing.T) {
t.Parallel()
tests := []struct {
name string
podYaml string
expectErr error
}{
{
name: "Pod is running",
podYaml: podCurrentYaml,
expectErr: nil,
},
{
name: "Pod is never ready",
podYaml: podYaml,
expectErr: context.DeadlineExceeded,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
fakeMapper := testutil.NewFakeRESTMapper(
v1.SchemeGroupVersion.WithKind("Pod"),
)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
pod := yamlToUnstructured(t, tt.podYaml)
statusWatcher := watcher.NewDefaultStatusWatcher(fakeClient, fakeMapper)
podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
require.NoError(t, fakeClient.Tracker().Create(podGVR, pod, pod.GetNamespace()))
objs := []v1alpha1.NamespacedObjectKindReference{
{
APIVersion: pod.GetAPIVersion(),
Kind: pod.GetKind(),
Namespace: pod.GetNamespace(),
Name: pod.GetName(),
},
}
err := runHealthChecks(ctx, statusWatcher, objs)
if tt.expectErr != nil {
require.ErrorIs(t, err, tt.expectErr)
return
}
require.NoError(t, err)
})
}
}

0 comments on commit 3b1ff1d

Please sign in to comment.