Skip to content

Commit

Permalink
Increase test coverage (#2225)
Browse files Browse the repository at this point in the history
* Use test wrapper in more tests

Signed-off-by: David Gageot <[email protected]>

* Increase coverage

Signed-off-by: David Gageot <[email protected]>

* Remove file committed by error

Signed-off-by: David Gageot <[email protected]>

* Add unit test for pod watcher

Signed-off-by: David Gageot <[email protected]>

* Remove dead code

Signed-off-by: David Gageot <[email protected]>

* Remove duplication

Signed-off-by: David Gageot <[email protected]>
  • Loading branch information
dgageot authored Jun 6, 2019
1 parent 599e238 commit 5e7a25f
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 180 deletions.
41 changes: 7 additions & 34 deletions pkg/skaffold/kubernetes/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import (
)

// WatchUntil reads items from the watch until the provided condition succeeds or the context is cancelled.
func watchUntil(ctx context.Context, w watch.Interface, condition func(event *watch.Event) (bool, error)) error {
func watchUntilTimeout(ctx context.Context, timeout time.Duration, w watch.Interface, condition func(event *watch.Event) (bool, error)) error {
ctx, cancelTimeout := context.WithTimeout(ctx, timeout)
defer cancelTimeout()

for {
select {
case <-ctx.Done():
Expand All @@ -53,27 +56,6 @@ func watchUntil(ctx context.Context, w watch.Interface, condition func(event *wa
}
}

// WaitForPodScheduled waits until the Pod is scheduled.
func WaitForPodScheduled(ctx context.Context, pods corev1.PodInterface, podName string) error {
logrus.Infof("Waiting for %s to be scheduled", podName)

w, err := pods.Watch(meta_v1.ListOptions{
IncludeUninitialized: true,
})
if err != nil {
return fmt.Errorf("initializing pod watcher: %s", err)
}
defer w.Stop()

ctx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second)
defer cancelTimeout()

return watchUntil(ctx, w, func(event *watch.Event) (bool, error) {
pod := event.Object.(*v1.Pod)
return pod.Name == podName, nil
})
}

// WaitForPodComplete waits until the Pod status is complete.
func WaitForPodComplete(ctx context.Context, pods corev1.PodInterface, podName string, timeout time.Duration) error {
logrus.Infof("Waiting for %s to be complete", podName)
Expand All @@ -86,10 +68,7 @@ func WaitForPodComplete(ctx context.Context, pods corev1.PodInterface, podName s
}
defer w.Stop()

ctx, cancelTimeout := context.WithTimeout(ctx, timeout)
defer cancelTimeout()

return watchUntil(ctx, w, func(event *watch.Event) (bool, error) {
return watchUntilTimeout(ctx, timeout, w, func(event *watch.Event) (bool, error) {
if event.Object == nil {
return false, nil
}
Expand Down Expand Up @@ -124,10 +103,7 @@ func WaitForPodInitialized(ctx context.Context, pods corev1.PodInterface, podNam
}
defer w.Stop()

ctx, cancelTimeout := context.WithTimeout(ctx, 10*time.Minute)
defer cancelTimeout()

return watchUntil(ctx, w, func(event *watch.Event) (bool, error) {
return watchUntilTimeout(ctx, 10*time.Minute, w, func(event *watch.Event) (bool, error) {
pod := event.Object.(*v1.Pod)
if pod.Name != podName {
return false, nil
Expand Down Expand Up @@ -157,10 +133,7 @@ func WaitForDeploymentToStabilize(ctx context.Context, c kubernetes.Interface, n
return fmt.Errorf("initializing deployment watcher: %s", err)
}

ctx, cancelTimeout := context.WithTimeout(ctx, timeout)
defer cancelTimeout()

return watchUntil(ctx, w, func(event *watch.Event) (bool, error) {
return watchUntilTimeout(ctx, timeout, w, func(event *watch.Event) (bool, error) {
if event.Type == watch.Deleted {
return false, apierrs.NewNotFound(schema.GroupResource{Resource: "deployments"}, "")
}
Expand Down
40 changes: 20 additions & 20 deletions pkg/skaffold/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,40 @@ import (
"k8s.io/apimachinery/pkg/watch"
)

// PodWatcher returns a watcher that will report on all Pod Events (additions, modifications, etc.)
func PodWatcher(namespace string) (watch.Interface, error) {
kubeclient, err := Client()
if err != nil {
return nil, errors.Wrap(err, "getting k8s client")
}
client := kubeclient.CoreV1()
var forever int64 = 3600 * 24 * 365 * 100
return client.Pods(namespace).Watch(meta_v1.ListOptions{
IncludeUninitialized: true,
TimeoutSeconds: &forever,
})
}

// AggregatePodWatcher returns a watcher for multiple namespaces.
func AggregatePodWatcher(namespaces []string, aggregate chan watch.Event) (func(), error) {
func AggregatePodWatcher(namespaces []string, aggregate chan<- watch.Event) (func(), error) {
watchers := make([]watch.Interface, 0, len(namespaces))
stopWatchers := func() {
for _, w := range watchers {
w.Stop()
}
}

kubeclient, err := Client()
if err != nil {
return func() {}, errors.Wrap(err, "getting k8s client")
}

var forever int64 = 3600 * 24 * 365 * 100

for _, ns := range namespaces {
watcher, err := PodWatcher(ns)
watcher, err := kubeclient.CoreV1().Pods(ns).Watch(meta_v1.ListOptions{
IncludeUninitialized: true,
TimeoutSeconds: &forever,
})
if err != nil {
return stopWatchers, errors.Wrap(err, "initializing pod watcher for "+ns)
stopWatchers()
return func() {}, errors.Wrap(err, "initializing pod watcher for "+ns)
}

watchers = append(watchers, watcher)
go func(w watch.Interface) {
for msg := range w.ResultChan() {

go func() {
for msg := range watcher.ResultChan() {
aggregate <- msg
}
}(watcher)
}()
}

return stopWatchers, nil
}
79 changes: 79 additions & 0 deletions pkg/skaffold/kubernetes/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2019 The Skaffold Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubernetes

import (
"testing"

"github.com/GoogleContainerTools/skaffold/testutil"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
)

func pod(name string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}}
}

func TestAggregatePodWatcher(t *testing.T) {
testutil.Run(t, "fail to get client", func(t *testutil.T) {
t.Override(&Client, func() (kubernetes.Interface, error) { return nil, errors.New("unable to get client") })

cleanup, err := AggregatePodWatcher([]string{"ns"}, nil)
defer cleanup()

t.CheckErrorContains("unable to get client", err)
})

testutil.Run(t, "fail to watch pods", func(t *testutil.T) {
clientset := fake.NewSimpleClientset()
t.Override(&Client, func() (kubernetes.Interface, error) { return clientset, nil })

clientset.Fake.PrependWatchReactor("pods", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) {
return true, nil, errors.New("unable to watch")
})

cleanup, err := AggregatePodWatcher([]string{"ns"}, nil)
defer cleanup()

t.CheckErrorContains("unable to watch", err)
})

testutil.Run(t, "watch 3 events", func(t *testutil.T) {
clientset := fake.NewSimpleClientset()
t.Override(&Client, func() (kubernetes.Interface, error) { return clientset, nil })

events := make(chan watch.Event)
cleanup, err := AggregatePodWatcher([]string{"ns1", "ns2"}, events)
defer cleanup()
t.CheckNoError(err)

// Send three events
clientset.CoreV1().Pods("ns1").Create(pod("pod1"))
clientset.CoreV1().Pods("ns2").Create(pod("pod2"))
clientset.CoreV1().Pods("ns2").Create(pod("pod3"))

// Retrieve three events
<-events
<-events
<-events
})
}
18 changes: 18 additions & 0 deletions pkg/skaffold/test/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ func TestTestDependencies(t *testing.T) {
testutil.CheckErrorAndDeepEqual(t, false, err, expectedDeps, deps)
}

func TestWrongPattern(t *testing.T) {
runCtx := &runcontext.RunContext{
Cfg: &latest.Pipeline{
Test: []*latest.TestCase{
{StructureTests: []string{"[]"}},
},
},
}

tester := NewTester(runCtx)

_, err := tester.TestDependencies()
testutil.CheckError(t, true, err)

err = tester.Test(context.Background(), ioutil.Discard, nil)
testutil.CheckError(t, true, err)
}

func TestNoTest(t *testing.T) {
runCtx := &runcontext.RunContext{
Cfg: &latest.Pipeline{},
Expand Down
Loading

0 comments on commit 5e7a25f

Please sign in to comment.