diff --git a/pkg/skaffold/kubernetes/wait.go b/pkg/skaffold/kubernetes/wait.go index 229c79aa764..60b74454c29 100644 --- a/pkg/skaffold/kubernetes/wait.go +++ b/pkg/skaffold/kubernetes/wait.go @@ -36,7 +36,10 @@ import ( ) // WatchUntil reads items from the watch until the provided condition succeeds or the context is cancelled. -func watchUntil(ctx context.Context, w watch.Interface, condition func(event *watch.Event) (bool, error)) error { +func watchUntilTimeout(ctx context.Context, timeout time.Duration, w watch.Interface, condition func(event *watch.Event) (bool, error)) error { + ctx, cancelTimeout := context.WithTimeout(ctx, timeout) + defer cancelTimeout() + for { select { case <-ctx.Done(): @@ -53,27 +56,6 @@ func watchUntil(ctx context.Context, w watch.Interface, condition func(event *wa } } -// WaitForPodScheduled waits until the Pod is scheduled. -func WaitForPodScheduled(ctx context.Context, pods corev1.PodInterface, podName string) error { - logrus.Infof("Waiting for %s to be scheduled", podName) - - w, err := pods.Watch(meta_v1.ListOptions{ - IncludeUninitialized: true, - }) - if err != nil { - return fmt.Errorf("initializing pod watcher: %s", err) - } - defer w.Stop() - - ctx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second) - defer cancelTimeout() - - return watchUntil(ctx, w, func(event *watch.Event) (bool, error) { - pod := event.Object.(*v1.Pod) - return pod.Name == podName, nil - }) -} - // WaitForPodComplete waits until the Pod status is complete. func WaitForPodComplete(ctx context.Context, pods corev1.PodInterface, podName string, timeout time.Duration) error { logrus.Infof("Waiting for %s to be complete", podName) @@ -86,10 +68,7 @@ func WaitForPodComplete(ctx context.Context, pods corev1.PodInterface, podName s } defer w.Stop() - ctx, cancelTimeout := context.WithTimeout(ctx, timeout) - defer cancelTimeout() - - return watchUntil(ctx, w, func(event *watch.Event) (bool, error) { + return watchUntilTimeout(ctx, timeout, w, func(event *watch.Event) (bool, error) { if event.Object == nil { return false, nil } @@ -124,10 +103,7 @@ func WaitForPodInitialized(ctx context.Context, pods corev1.PodInterface, podNam } defer w.Stop() - ctx, cancelTimeout := context.WithTimeout(ctx, 10*time.Minute) - defer cancelTimeout() - - return watchUntil(ctx, w, func(event *watch.Event) (bool, error) { + return watchUntilTimeout(ctx, 10*time.Minute, w, func(event *watch.Event) (bool, error) { pod := event.Object.(*v1.Pod) if pod.Name != podName { return false, nil @@ -157,10 +133,7 @@ func WaitForDeploymentToStabilize(ctx context.Context, c kubernetes.Interface, n return fmt.Errorf("initializing deployment watcher: %s", err) } - ctx, cancelTimeout := context.WithTimeout(ctx, timeout) - defer cancelTimeout() - - return watchUntil(ctx, w, func(event *watch.Event) (bool, error) { + return watchUntilTimeout(ctx, timeout, w, func(event *watch.Event) (bool, error) { if event.Type == watch.Deleted { return false, apierrs.NewNotFound(schema.GroupResource{Resource: "deployments"}, "") } diff --git a/pkg/skaffold/kubernetes/watcher.go b/pkg/skaffold/kubernetes/watcher.go index 2c2dff0e7ee..ec1e96aea2d 100644 --- a/pkg/skaffold/kubernetes/watcher.go +++ b/pkg/skaffold/kubernetes/watcher.go @@ -22,22 +22,8 @@ import ( "k8s.io/apimachinery/pkg/watch" ) -// PodWatcher returns a watcher that will report on all Pod Events (additions, modifications, etc.) -func PodWatcher(namespace string) (watch.Interface, error) { - kubeclient, err := Client() - if err != nil { - return nil, errors.Wrap(err, "getting k8s client") - } - client := kubeclient.CoreV1() - var forever int64 = 3600 * 24 * 365 * 100 - return client.Pods(namespace).Watch(meta_v1.ListOptions{ - IncludeUninitialized: true, - TimeoutSeconds: &forever, - }) -} - // AggregatePodWatcher returns a watcher for multiple namespaces. -func AggregatePodWatcher(namespaces []string, aggregate chan watch.Event) (func(), error) { +func AggregatePodWatcher(namespaces []string, aggregate chan<- watch.Event) (func(), error) { watchers := make([]watch.Interface, 0, len(namespaces)) stopWatchers := func() { for _, w := range watchers { @@ -45,17 +31,31 @@ func AggregatePodWatcher(namespaces []string, aggregate chan watch.Event) (func( } } + kubeclient, err := Client() + if err != nil { + return func() {}, errors.Wrap(err, "getting k8s client") + } + + var forever int64 = 3600 * 24 * 365 * 100 + for _, ns := range namespaces { - watcher, err := PodWatcher(ns) + watcher, err := kubeclient.CoreV1().Pods(ns).Watch(meta_v1.ListOptions{ + IncludeUninitialized: true, + TimeoutSeconds: &forever, + }) if err != nil { - return stopWatchers, errors.Wrap(err, "initializing pod watcher for "+ns) + stopWatchers() + return func() {}, errors.Wrap(err, "initializing pod watcher for "+ns) } + watchers = append(watchers, watcher) - go func(w watch.Interface) { - for msg := range w.ResultChan() { + + go func() { + for msg := range watcher.ResultChan() { aggregate <- msg } - }(watcher) + }() } + return stopWatchers, nil } diff --git a/pkg/skaffold/kubernetes/watcher_test.go b/pkg/skaffold/kubernetes/watcher_test.go new file mode 100644 index 00000000000..fa36b9c6842 --- /dev/null +++ b/pkg/skaffold/kubernetes/watcher_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2019 The Skaffold Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "testing" + + "github.com/GoogleContainerTools/skaffold/testutil" + "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" +) + +func pod(name string) *v1.Pod { + return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}} +} + +func TestAggregatePodWatcher(t *testing.T) { + testutil.Run(t, "fail to get client", func(t *testutil.T) { + t.Override(&Client, func() (kubernetes.Interface, error) { return nil, errors.New("unable to get client") }) + + cleanup, err := AggregatePodWatcher([]string{"ns"}, nil) + defer cleanup() + + t.CheckErrorContains("unable to get client", err) + }) + + testutil.Run(t, "fail to watch pods", func(t *testutil.T) { + clientset := fake.NewSimpleClientset() + t.Override(&Client, func() (kubernetes.Interface, error) { return clientset, nil }) + + clientset.Fake.PrependWatchReactor("pods", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) { + return true, nil, errors.New("unable to watch") + }) + + cleanup, err := AggregatePodWatcher([]string{"ns"}, nil) + defer cleanup() + + t.CheckErrorContains("unable to watch", err) + }) + + testutil.Run(t, "watch 3 events", func(t *testutil.T) { + clientset := fake.NewSimpleClientset() + t.Override(&Client, func() (kubernetes.Interface, error) { return clientset, nil }) + + events := make(chan watch.Event) + cleanup, err := AggregatePodWatcher([]string{"ns1", "ns2"}, events) + defer cleanup() + t.CheckNoError(err) + + // Send three events + clientset.CoreV1().Pods("ns1").Create(pod("pod1")) + clientset.CoreV1().Pods("ns2").Create(pod("pod2")) + clientset.CoreV1().Pods("ns2").Create(pod("pod3")) + + // Retrieve three events + <-events + <-events + <-events + }) +} diff --git a/pkg/skaffold/test/test_test.go b/pkg/skaffold/test/test_test.go index 51c8ad406fb..a9f2614f500 100644 --- a/pkg/skaffold/test/test_test.go +++ b/pkg/skaffold/test/test_test.go @@ -64,6 +64,24 @@ func TestTestDependencies(t *testing.T) { testutil.CheckErrorAndDeepEqual(t, false, err, expectedDeps, deps) } +func TestWrongPattern(t *testing.T) { + runCtx := &runcontext.RunContext{ + Cfg: &latest.Pipeline{ + Test: []*latest.TestCase{ + {StructureTests: []string{"[]"}}, + }, + }, + } + + tester := NewTester(runCtx) + + _, err := tester.TestDependencies() + testutil.CheckError(t, true, err) + + err = tester.Test(context.Background(), ioutil.Discard, nil) + testutil.CheckError(t, true, err) +} + func TestNoTest(t *testing.T) { runCtx := &runcontext.RunContext{ Cfg: &latest.Pipeline{}, diff --git a/pkg/skaffold/util/tar_test.go b/pkg/skaffold/util/tar_test.go index 91ae0e5f651..d4c04fed9f1 100644 --- a/pkg/skaffold/util/tar_test.go +++ b/pkg/skaffold/util/tar_test.go @@ -19,6 +19,7 @@ package util import ( "archive/tar" "bytes" + "compress/gzip" "io" "io/ioutil" "testing" @@ -27,155 +28,178 @@ import ( ) func TestCreateTar(t *testing.T) { - tmpDir, cleanup := testutil.NewTempDir(t) - defer cleanup() - - files := map[string]string{ - "foo": "baz1", - "bar/bat": "baz2", - "bar/baz": "baz3", - } - var paths []string - for path, content := range files { - tmpDir.Write(path, content) - paths = append(paths, path) - } - - reset := testutil.Chdir(t, tmpDir.Root()) - defer reset() - - var b bytes.Buffer - err := CreateTar(&b, ".", paths) - testutil.CheckError(t, false, err) - - // Make sure the contents match. - tarFiles := make(map[string]string) - tr := tar.NewReader(&b) - for { - hdr, err := tr.Next() - if err == io.EOF { - break + testutil.Run(t, "", func(t *testutil.T) { + files := map[string]string{ + "foo": "baz1", + "bar/bat": "baz2", + "bar/baz": "baz3", + } + _, paths := prepareFiles(t, files) + + var b bytes.Buffer + err := CreateTar(&b, ".", paths) + t.CheckNoError(err) + + // Make sure the contents match. + tarFiles := make(map[string]string) + tr := tar.NewReader(&b) + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + t.CheckNoError(err) + + content, err := ioutil.ReadAll(tr) + t.CheckNoError(err) + + tarFiles[hdr.Name] = string(content) } - testutil.CheckError(t, false, err) - content, err := ioutil.ReadAll(tr) - testutil.CheckError(t, false, err) + t.CheckDeepEqual(files, tarFiles) + }) +} - tarFiles[hdr.Name] = string(content) - } +func TestCreateTarGz(t *testing.T) { + testutil.Run(t, "", func(t *testutil.T) { + files := map[string]string{ + "foo": "baz1", + "bar/bat": "baz2", + "bar/baz": "baz3", + } + _, paths := prepareFiles(t, files) + + var b bytes.Buffer + err := CreateTarGz(&b, ".", paths) + t.CheckNoError(err) + + // Make sure the contents match. + tarFiles := make(map[string]string) + gzr, err := gzip.NewReader(&b) + t.CheckNoError(err) + tr := tar.NewReader(gzr) + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + t.CheckNoError(err) + + content, err := ioutil.ReadAll(tr) + t.CheckNoError(err) + + tarFiles[hdr.Name] = string(content) + } - testutil.CheckErrorAndDeepEqual(t, false, err, files, tarFiles) + t.CheckDeepEqual(files, tarFiles) + }) } func TestCreateTarSubDirectory(t *testing.T) { - tmpDir, cleanup := testutil.NewTempDir(t) - defer cleanup() - - files := map[string]string{ - "sub/foo": "baz1", - "sub/bar/bat": "baz2", - "sub/bar/baz": "baz3", - } - var paths []string - for path, content := range files { - tmpDir.Write(path, content) - paths = append(paths, path) - } - - reset := testutil.Chdir(t, tmpDir.Root()) - defer reset() - - var b bytes.Buffer - err := CreateTar(&b, "sub", paths) - testutil.CheckError(t, false, err) - - // Make sure the contents match. - tarFiles := make(map[string]string) - tr := tar.NewReader(&b) - for { - hdr, err := tr.Next() - if err == io.EOF { - break + testutil.Run(t, "", func(t *testutil.T) { + files := map[string]string{ + "sub/foo": "baz1", + "sub/bar/bat": "baz2", + "sub/bar/baz": "baz3", + } + _, paths := prepareFiles(t, files) + + var b bytes.Buffer + err := CreateTar(&b, "sub", paths) + t.CheckNoError(err) + + // Make sure the contents match. + tarFiles := make(map[string]string) + tr := tar.NewReader(&b) + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + t.CheckNoError(err) + + content, err := ioutil.ReadAll(tr) + t.CheckNoError(err) + + tarFiles["sub/"+hdr.Name] = string(content) } - testutil.CheckError(t, false, err) - - content, err := ioutil.ReadAll(tr) - testutil.CheckError(t, false, err) - - tarFiles["sub/"+hdr.Name] = string(content) - } - testutil.CheckErrorAndDeepEqual(t, false, err, files, tarFiles) + t.CheckDeepEqual(files, tarFiles) + }) } func TestCreateTarEmptyFolder(t *testing.T) { - tmpDir, cleanup := testutil.NewTempDir(t) - defer cleanup() - - tmpDir.Mkdir("empty") - - reset := testutil.Chdir(t, tmpDir.Root()) - defer reset() + testutil.Run(t, "", func(t *testutil.T) { + tmpDir := t.NewTempDir() + tmpDir.Mkdir("empty") + t.Chdir(tmpDir.Root()) + + var b bytes.Buffer + err := CreateTar(&b, ".", []string{"empty"}) + t.CheckNoError(err) + + // Make sure the contents match. + var tarFolders []string + tr := tar.NewReader(&b) + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + t.CheckNoError(err) + + if hdr.FileInfo().IsDir() { + tarFolders = append(tarFolders, hdr.Name) + } + } - var b bytes.Buffer - err := CreateTar(&b, ".", []string{"empty"}) - testutil.CheckError(t, false, err) + t.CheckNoError(err) + t.CheckDeepEqual([]string{"empty"}, tarFolders) + }) +} - // Make sure the contents match. - var tarFolders []string - tr := tar.NewReader(&b) - for { - hdr, err := tr.Next() - if err == io.EOF { - break +func TestCreateTarWithAbsolutePaths(t *testing.T) { + testutil.Run(t, "", func(t *testutil.T) { + files := map[string]string{ + "foo": "baz1", + "bar/bat": "baz2", + "bar/baz": "baz3", } - testutil.CheckError(t, false, err) - - if hdr.FileInfo().IsDir() { - tarFolders = append(tarFolders, hdr.Name) + tmpDir, paths := prepareFiles(t, files) + + var b bytes.Buffer + err := CreateTar(&b, tmpDir.Root(), tmpDir.Paths(paths...)) + t.CheckNoError(err) + + // Make sure the contents match. + tarFiles := make(map[string]string) + tr := tar.NewReader(&b) + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + t.CheckNoError(err) + + content, err := ioutil.ReadAll(tr) + t.CheckNoError(err) + + tarFiles[hdr.Name] = string(content) } - } - testutil.CheckErrorAndDeepEqual(t, false, err, []string{"empty"}, tarFolders) + t.CheckDeepEqual(files, tarFiles) + }) } -func TestCreateTarWithAbsolutePaths(t *testing.T) { - tmpDir, cleanup := testutil.NewTempDir(t) - defer cleanup() +func prepareFiles(t *testutil.T, files map[string]string) (*testutil.TempDir, []string) { + tmpDir := t.NewTempDir() + t.Chdir(tmpDir.Root()) - files := map[string]string{ - "foo": "baz1", - "bar/bat": "baz2", - "bar/baz": "baz3", - } var paths []string for path, content := range files { tmpDir.Write(path, content) - paths = append(paths, tmpDir.Path(path)) - } - - reset := testutil.Chdir(t, tmpDir.Root()) - defer reset() - - var b bytes.Buffer - err := CreateTar(&b, tmpDir.Root(), paths) - testutil.CheckError(t, false, err) - - // Make sure the contents match. - tarFiles := make(map[string]string) - tr := tar.NewReader(&b) - for { - hdr, err := tr.Next() - if err == io.EOF { - break - } - testutil.CheckError(t, false, err) - - content, err := ioutil.ReadAll(tr) - testutil.CheckError(t, false, err) - - tarFiles[hdr.Name] = string(content) + paths = append(paths, path) } - testutil.CheckErrorAndDeepEqual(t, false, err, files, tarFiles) + return tmpDir, paths } diff --git a/pkg/skaffold/util/util_test.go b/pkg/skaffold/util/util_test.go index b33866ba1ad..077e2615fb0 100644 --- a/pkg/skaffold/util/util_test.go +++ b/pkg/skaffold/util/util_test.go @@ -88,6 +88,11 @@ func TestExpandPathsGlob(t *testing.T) { in: []string{"dir*"}, out: []string{tmpDir.Path("dir/sub_dir/file"), tmpDir.Path("dir_b/sub_dir_b/file")}, }, + { + description: "invalid pattern", + in: []string{"[]"}, + shouldErr: true, + }, } for _, test := range tests { testutil.Run(t, test.description, func(t *testutil.T) { diff --git a/testutil/debug.test b/testutil/debug.test deleted file mode 100755 index e084219837a..00000000000 Binary files a/testutil/debug.test and /dev/null differ