Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1694 from hongchaodeng/t
Browse files Browse the repository at this point in the history
test: fix wait pod running in log collector
  • Loading branch information
hongchaodeng authored Nov 23, 2017
2 parents bd05548 + 117577e commit 62d20d4
Showing 1 changed file with 38 additions and 5 deletions.
43 changes: 38 additions & 5 deletions test/logcollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"errors"
"flag"
"io"
"os"
Expand All @@ -23,7 +24,9 @@ import (

"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -53,10 +56,19 @@ func main() {
podListWatcher := cache.NewListWatchFromClient(kubecli.CoreV1().RESTClient(), "pods", *ns, fields.Everything())
_, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
go func(namespace, logsDir string) {
// TODO: wait until pod is running
time.Sleep(5 * time.Second)
go func(pod *v1.Pod, namespace, logsDir string) {
watcher, err := kubecli.CoreV1().Pods(namespace).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: pod.Name}))
if err != nil {
logrus.Errorf("failed to watch for pod (%s): %v", pod.Name, err)
return
}
// We need to wait Pod Running before collecting its log
ev, err := watch.Until(100*time.Second, watcher, podRunning)
if err != nil {
logrus.Errorf("failed to reach Running for pod (%s): %v", pod.Name, err)
return
}
pod = ev.Object.(*v1.Pod)

req := kubecli.CoreV1().Pods(namespace).GetLogs(pod.Name, &v1.PodLogOptions{Follow: true})
readCloser, err := req.Stream()
Expand All @@ -75,13 +87,14 @@ func main() {
if err != nil {
logrus.Errorf("failed to write log for pod (%s): %v", pod.Name, err)
}
}(*ns, *logsDir)
}(obj.(*v1.Pod), *ns, *logsDir)
},
UpdateFunc: func(old, new interface{}) {
pod := new.(*v1.Pod)
if pod.Name != *e2ePodName {
return
}
// If e2e test pod runs to completion, then stops this program.
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
close(stopCh)
}
Expand All @@ -93,3 +106,23 @@ func main() {
informer.Run(stopCh)
logrus.Info("start collecting logs...")
}

// **NOTE**: Copy from kubernetes.
// podRunning returns true if the pod is running, false if the pod has not yet reached running state,
// returns ErrPodCompleted if the pod has run to completion, or an error in any other case.
func podRunning(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.New("pod deleted")
}
switch t := event.Object.(type) {
case *v1.Pod:
switch t.Status.Phase {
case v1.PodRunning:
return true, nil
case v1.PodFailed, v1.PodSucceeded:
return false, errors.New("pod ran to completion")
}
}
return false, nil
}

0 comments on commit 62d20d4

Please sign in to comment.