Skip to content

Commit

Permalink
pkg/kubeapply: Transition to use pkg/kates rather than pkg/k8s
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Shumaker <[email protected]>
  • Loading branch information
LukeShu committed May 2, 2022
1 parent adbb2b7 commit 4fae467
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 187 deletions.
14 changes: 5 additions & 9 deletions cmd/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/datawire/ambassador/v2/pkg/api/agent"
"github.com/datawire/ambassador/v2/pkg/dtest"
"github.com/datawire/ambassador/v2/pkg/k8s"
"github.com/datawire/ambassador/v2/pkg/kates"
"github.com/datawire/ambassador/v2/pkg/kubeapply"
snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
Expand All @@ -39,7 +38,7 @@ func TestAgentE2E(t *testing.T) {
// applies all k8s yaml to dtest cluter
// ambassador, ambassador-agent, rbac, crds, and a fake agentcom that implements the grpc
// server for the agent
setup(t, ctx, kubeconfig, cli)
setup(t, ctx, cli)

// eh lets make sure the agent came up
time.Sleep(time.Second * 3)
Expand Down Expand Up @@ -67,7 +66,7 @@ func TestAgentE2E(t *testing.T) {
assert.Empty(t, ambSnapshot.Kubernetes.ArgoRollouts, "rollouts found in snapshot")
assert.Empty(t, ambSnapshot.Kubernetes.ArgoApplications, "applications found in snapshot")

applyArgoResources(t, ctx, kubeconfig, cli)
applyArgoResources(t, ctx, cli)
hasArgo = true
reportSnapshot, ambSnapshot = getAgentComSnapshots(t, ctx, kubeconfig, cli, hasArgo)
assert.NotEmpty(t, ambSnapshot.Kubernetes.ArgoRollouts, "No argo rollouts found in snapshot")
Expand Down Expand Up @@ -160,8 +159,7 @@ func snapshotIsSane(ambSnapshot *snapshotTypes.Snapshot, t *testing.T, hasArgo b

return true
}
func applyArgoResources(t *testing.T, ctx context.Context, kubeconfig string, cli *kates.Client) {
kubeinfo := k8s.NewKubeInfo(kubeconfig, "", "")
func applyArgoResources(t *testing.T, ctx context.Context, kubeinfo *kates.Client) {
require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/argo-rollouts-crd.yaml"))
require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/argo-application-crd.yaml"))
time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -215,7 +213,7 @@ func yamlFilename(t *testing.T, inFilename, image string) string {
return outFilename
}

func setup(t *testing.T, ctx context.Context, kubeconfig string, cli *kates.Client) {
func setup(t *testing.T, ctx context.Context, kubeinfo *kates.Client) {
require.NoError(t, needsDockerBuilds(ctx, map[string]string{
"AMBASSADOR_DOCKER_IMAGE": "docker/emissary.docker.push.remote",
"KAT_SERVER_DOCKER_IMAGE": "docker/kat-server.docker.push.remote",
Expand All @@ -227,8 +225,6 @@ func setup(t *testing.T, ctx context.Context, kubeconfig string, cli *kates.Clie
crdFile := yamlFilename(t, "../../manifests/emissary/emissary-crds.yaml.in", image)
aesFile := yamlFilename(t, "../../manifests/emissary/emissary-emissaryns.yaml.in", image)

kubeinfo := k8s.NewKubeInfo(kubeconfig, "", "")

require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, crdFile))
require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/namespace.yaml"))
require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, 2*time.Minute, true, false, aesFile))
Expand Down Expand Up @@ -264,7 +260,7 @@ func setup(t *testing.T, ctx context.Context, kubeconfig string, cli *kates.Clie
},
})
require.NoError(t, err)
require.NoError(t, cli.Patch(ctx, dep, kates.StrategicMergePatchType, []byte(patch), dep))
require.NoError(t, kubeinfo.Patch(ctx, dep, kates.StrategicMergePatchType, []byte(patch), dep))

time.Sleep(3 * time.Second)
require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/sample-config.yaml"))
Expand Down
7 changes: 6 additions & 1 deletion cmd/k8sregistryctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spf13/cobra"

"github.com/datawire/ambassador/v2/pkg/k8s"
"github.com/datawire/ambassador/v2/pkg/kates"
"github.com/datawire/ambassador/v2/pkg/kubeapply"
)

Expand Down Expand Up @@ -119,6 +120,10 @@ func main() {
}

kubeinfo := k8s.NewKubeInfo("", "", "")
kubeclient, err := kates.NewClient(kates.ClientConfig{})
if err != nil {
return err
}

// Part 1: Apply the YAML
//
Expand All @@ -141,7 +146,7 @@ func main() {
}
err = kubeapply.Kubeapply(
cobraCmd.Context(), // context
kubeinfo, // kubeinfo
kubeclient, // kubeclient
time.Minute, // perPhaseTimeout
false, // debug
false, // dryRun
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ require (
google.golang.org/genproto v0.0.0-20220204002441-d6cc3cc0770e
google.golang.org/grpc v1.44.0
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.21.9
k8s.io/apiextensions-apiserver v0.21.9
k8s.io/apimachinery v0.21.9
Expand Down Expand Up @@ -208,6 +207,7 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/apiserver v0.21.9 // indirect
k8s.io/component-base v0.21.9 // indirect
Expand Down
8 changes: 6 additions & 2 deletions pkg/dtest/k8sapply.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"time"

"github.com/datawire/ambassador/v2/pkg/k8s"
"github.com/datawire/ambassador/v2/pkg/kates"
"github.com/datawire/ambassador/v2/pkg/kubeapply"
"github.com/datawire/dlib/dexec"
"github.com/datawire/dlib/dlog"
Expand All @@ -18,8 +18,12 @@ func K8sApply(ctx context.Context, ver KubeVersion, files ...string) {
os.Setenv("DOCKER_REGISTRY", DockerRegistry(ctx))
}
kubeconfig := KubeVersionConfig(ctx, ver)
err := kubeapply.Kubeapply(ctx, k8s.NewKubeInfo(kubeconfig, "", ""), 300*time.Second, false, false, files...)
kubeclient, err := kates.NewClient(kates.ClientConfig{Kubeconfig: kubeconfig})
if err != nil {
dlog.Errorln(ctx, err)
os.Exit(1)
}
if err := kubeapply.Kubeapply(ctx, kubeclient, 300*time.Second, false, false, files...); err != nil {
dlog.Println(ctx)
dlog.Println(ctx, err)
dlog.Printf(ctx,
Expand Down
5 changes: 5 additions & 0 deletions pkg/kates/aliases.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/version"
Expand Down Expand Up @@ -47,6 +48,10 @@ type TypeMeta = metav1.TypeMeta
type ObjectMeta = metav1.ObjectMeta
type APIResource = metav1.APIResource

type GroupVersionKind = schema.GroupVersionKind

var GroupVersionKindFromAPIVersionAndKind = schema.FromAPIVersionAndKind

type Namespace = corev1.Namespace

type ObjectReference = corev1.ObjectReference
Expand Down
15 changes: 15 additions & 0 deletions pkg/kates/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ func InCluster() bool {
err == nil && !fi.IsDir()
}

// CurrentNamespace returns the namespace that is used if none is otherwise specified.
func (c *Client) CurrentNamespace() (string, error) {
ns, _, err := c.config.ToRawKubeConfigLoader().Namespace()
return ns, err
}

// DynamicInterface is an accessor method to the k8s dynamic client
func (c *Client) DynamicInterface() dynamic.Interface {
return c.cli
Expand Down Expand Up @@ -581,6 +587,15 @@ func (lw *lw) Watch(opts ListOptions) (watch.Interface, error) {

// ==

// IsNamespaced returns whether a (fully-qualified) GVK is namespaced.
func (c *Client) IsNamespaced(gvk GroupVersionKind) (bool, error) {
mapping, err := c.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return false, err
}
return mapping.Scope.Name() == meta.RESTScopeNameNamespace, nil
}

func (c *Client) cliFor(mapping *meta.RESTMapping, namespace string) dynamic.ResourceInterface {
cli := c.cli.Resource(mapping.Resource)
if mapping.Scope.Name() == meta.RESTScopeNameNamespace && namespace != NamespaceAll {
Expand Down
55 changes: 26 additions & 29 deletions pkg/kubeapply/kubeapply.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"syscall"
"time"

"github.com/datawire/ambassador/v2/pkg/k8s"
"github.com/datawire/ambassador/v2/pkg/kates"
"github.com/datawire/dlib/derror"
"github.com/datawire/dlib/dexec"
"github.com/datawire/dlib/dlog"
)

Expand All @@ -26,13 +25,13 @@ var errorDeadlineExceeded = errors.New("timeout exceeded")
// look in the standard default places for cluster configuration. If
// any phase takes longer than perPhaseTimeout to become ready, then
// it returns early with an error.
func Kubeapply(ctx context.Context, kubeinfo *k8s.KubeInfo, perPhaseTimeout time.Duration, debug, dryRun bool, files ...string) error {
func Kubeapply(ctx context.Context, kubeclient *kates.Client, perPhaseTimeout time.Duration, debug, dryRun bool, files ...string) error {
collection, err := CollectYAML(files...)
if err != nil {
return fmt.Errorf("CollectYAML: %w", err)
}

if err = collection.ApplyAndWait(ctx, kubeinfo, perPhaseTimeout, debug, dryRun); err != nil {
if err = collection.ApplyAndWait(ctx, kubeclient, perPhaseTimeout, debug, dryRun); err != nil {
return fmt.Errorf("ApplyAndWait: %w", err)
}

Expand Down Expand Up @@ -92,24 +91,23 @@ func (collection YAMLCollection) addFile(path string) {
// error.
func (collection YAMLCollection) ApplyAndWait(
ctx context.Context,
kubeinfo *k8s.KubeInfo,
kubeclient *kates.Client,
perPhaseTimeout time.Duration,
debug, dryRun bool,
) error {
if kubeinfo == nil {
kubeinfo = k8s.NewKubeInfo("", "", "")
}

phaseNames := make([]string, 0, len(collection))
for phaseName := range collection {
phaseNames = append(phaseNames, phaseName)
}
sort.Strings(phaseNames)

for _, phaseName := range phaseNames {
// Note: applyAndWait takes a separate 'deadline' argument, rather than the
// implicitly using `context.WithDeadline`, so that we can detect whether it's our
// per-phase timeout that triggered, or a broader "everything" timeout on the
// Context.
deadline := time.Now().Add(perPhaseTimeout)
err := applyAndWait(ctx, kubeinfo, deadline, debug, dryRun, collection[phaseName])
if err != nil {
if err := applyAndWait(ctx, kubeclient, deadline, debug, dryRun, collection[phaseName]); err != nil {
if errors.Is(err, errorDeadlineExceeded) {
err = fmt.Errorf("phase %q not ready after %v: %w", phaseName, perPhaseTimeout, err)
}
Expand All @@ -119,17 +117,13 @@ func (collection YAMLCollection) ApplyAndWait(
return nil
}

func applyAndWait(ctx context.Context, kubeinfo *k8s.KubeInfo, deadline time.Time, debug, dryRun bool, sourceFilenames []string) error {
func applyAndWait(ctx context.Context, kubeclient *kates.Client, deadline time.Time, debug, dryRun bool, sourceFilenames []string) error {
expandedFilenames, err := expand(ctx, sourceFilenames)
if err != nil {
return fmt.Errorf("expanding YAML: %w", err)
}

cli, err := k8s.NewClient(kubeinfo)
if err != nil {
return fmt.Errorf("connecting to cluster %v: %w", kubeinfo, err)
}
waiter, err := NewWaiter(cli.Watcher())
waiter, err := NewWaiter(kubeclient)
if err != nil {
return err
}
Expand Down Expand Up @@ -163,7 +157,7 @@ func applyAndWait(ctx context.Context, kubeinfo *k8s.KubeInfo, deadline time.Tim
return fmt.Errorf("waiter: %w", scanErrs)
}

if err := kubectlApply(ctx, kubeinfo, dryRun, expandedFilenames); err != nil {
if err := kubectlApply(ctx, kubeclient, dryRun, expandedFilenames); err != nil {
return err
}

Expand Down Expand Up @@ -196,13 +190,20 @@ func expand(ctx context.Context, names []string) ([]string, error) {
return result, nil
}

func kubectlApply(ctx context.Context, info *k8s.KubeInfo, dryRun bool, filenames []string) error {
args := []string{"apply"}
func kubectlApply(ctx context.Context, kubeclient *kates.Client, dryRun bool, filenames []string) error {
stdio := kates.IOStreams{
In: nil,
Out: dlog.StdLogger(ctx, dlog.LogLevelInfo).Writer(),
ErrOut: dlog.StdLogger(ctx, dlog.LogLevelWarn).Writer(),
}

var args []string
if dryRun {
args = append(args, "--dry-run")
}
for _, filename := range filenames {
// https://github.com/datawire/ambassador/issues/77
// flock(2) each file that we're passing to `kubectl apply`.
// https://github.com/datawire/teleproxy/issues/77
filehandle, err := os.Open(filename)
if err != nil {
return err
Expand All @@ -211,17 +212,13 @@ func kubectlApply(ctx context.Context, info *k8s.KubeInfo, dryRun bool, filename
if err := syscall.Flock(int(filehandle.Fd()), syscall.LOCK_EX); err != nil {
return err
}

// pass the file to `kubectl apply`
args = append(args, "-f", filename)
}
kargs, err := info.GetKubectlArray(args...)
if err != nil {
return err
}
dlog.Printf(ctx, "kubectl %s\n", strings.Join(kargs, " "))
/* #nosec */
if err := dexec.CommandContext(ctx, "kubectl", kargs...).Run(); err != nil {

if err := kubeclient.IncoherentApply(ctx, stdio, args...); err != nil {
return err
}

return nil
}
Loading

0 comments on commit 4fae467

Please sign in to comment.