Skip to content

Commit

Permalink
Collect workload pod logs for all namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
Cecile Robert-Michon committed Jul 25, 2022
1 parent 454aeb1 commit 7f90d4b
Showing 1 changed file with 31 additions and 32 deletions.
63 changes: 31 additions & 32 deletions test/e2e/azure_clusterproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
expv1 "sigs.k8s.io/cluster-api-provider-azure/exp/api/v1beta1"
clusterv1exp "sigs.k8s.io/cluster-api/exp/api/v1beta1"
"sigs.k8s.io/cluster-api/test/framework"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type (
Expand Down Expand Up @@ -93,7 +92,7 @@ func (acp *AzureClusterProxy) CollectWorkloadClusterLogs(ctx context.Context, na

aboveMachinesPath := strings.Replace(outputPath, "/machines", "", 1)

Byf("Dumping workload cluster %s/%s kube-system pod logs", namespace, name)
Byf("Dumping workload cluster %s/%s pod logs", namespace, name)
start := time.Now()
acp.collectPodLogs(ctx, namespace, name, aboveMachinesPath)
Byf("Fetching kube-system pod logs took %s", time.Since(start).String())
Expand All @@ -107,32 +106,38 @@ func (acp *AzureClusterProxy) CollectWorkloadClusterLogs(ctx context.Context, na
func (acp *AzureClusterProxy) collectPodLogs(ctx context.Context, namespace string, name string, aboveMachinesPath string) {
workload := acp.GetWorkloadCluster(ctx, namespace, name)
pods := &corev1.PodList{}
Expect(workload.GetClient().List(ctx, pods, client.InNamespace(kubesystem))).To(Succeed())

events, err := workload.GetClientSet().CoreV1().Events(kubesystem).List(ctx, metav1.ListOptions{})
eventMsgs := map[string]string{}
if err != nil {
Byf("failed to get events in kube-system namespace: %v", err)
} else {
for _, event := range events.Items {
if event.InvolvedObject.Kind == "Pod" {
if _, ok := eventMsgs[event.InvolvedObject.Name]; !ok {
eventMsgs[event.InvolvedObject.Name] = event.Message
} else {
eventMsgs[event.InvolvedObject.Name] += fmt.Sprintf("\n%s", event.Message)
}
Expect(workload.GetClient().List(ctx, pods)).To(Succeed())

var events map[string]*corev1.EventList
var err error

for _, pod := range pods.Items {
podNamespace := pod.GetNamespace()

// Collect events for Pod.
if _, ok := events[podNamespace]; !ok {
events[podNamespace], err = workload.GetClientSet().CoreV1().Events(podNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
Byf("failed to get events in %s namespace: %v", podNamespace, err)
}
}

var eventMsgs string

for _, event := range events[podNamespace].Items {
if event.InvolvedObject.Kind == "Pod" && event.InvolvedObject.Name == pod.GetName() {
eventMsgs += fmt.Sprintf("%s\n", event.Message)
}
}
}

for _, pod := range pods.Items {
for _, container := range pod.Spec.Containers {
// Watch each container's logs in a goroutine so we can stream them all concurrently.
go func(pod corev1.Pod, container corev1.Container) {
defer GinkgoRecover()

Byf("Creating log watcher for controller %s/%s, container %s", kubesystem, pod.Name, container.Name)
logFile := path.Join(aboveMachinesPath, kubesystem, pod.Name, container.Name+".log")
Byf("Creating log watcher for controller %s/%s, container %s", podNamespace, pod.Name, container.Name)
logFile := path.Join(aboveMachinesPath, podNamespace, pod.Name, container.Name+".log")
if os.MkdirAll(filepath.Dir(logFile), 0755); err != nil {
// Failing to mkdir should not cause the test to fail
Byf("Error mkdir: %v", err)
Expand All @@ -152,10 +157,10 @@ func (acp *AzureClusterProxy) collectPodLogs(ctx context.Context, namespace stri
Follow: true,
}

podLogs, err := workload.GetClientSet().CoreV1().Pods(kubesystem).GetLogs(pod.Name, opts).Stream(ctx)
podLogs, err := workload.GetClientSet().CoreV1().Pods(podNamespace).GetLogs(pod.Name, opts).Stream(ctx)
if err != nil {
// Failing to stream logs should not cause the test to fail
Byf("Error starting logs stream for pod %s/%s, container %s: %v", kubesystem, pod.Name, container.Name, err)
Byf("Error starting logs stream for pod %s/%s, container %s: %v", podNamespace, pod.Name, container.Name, err)
return
}
defer podLogs.Close()
Expand All @@ -165,16 +170,16 @@ func (acp *AzureClusterProxy) collectPodLogs(ctx context.Context, namespace stri
_, err = out.ReadFrom(podLogs)
if err != nil && err != io.ErrUnexpectedEOF {
// Failing to stream logs should not cause the test to fail
Byf("Got error while streaming logs for pod %s/%s, container %s: %v", kubesystem, pod.Name, container.Name, err)
Byf("Got error while streaming logs for pod %s/%s, container %s: %v", podNamespace, pod.Name, container.Name, err)
}
}(pod, container)
}

go func(pod corev1.Pod) {
defer GinkgoRecover()

Byf("Collecting events for Pod %s/%s", kubesystem, pod.Name)
eventFile := path.Join(aboveMachinesPath, kubesystem, pod.Name, "pod-events.txt")
Byf("Collecting events for Pod %s/%s", podNamespace, pod.Name)
eventFile := path.Join(aboveMachinesPath, podNamespace, pod.Name, "pod-events.txt")
if err := os.MkdirAll(filepath.Dir(eventFile), 0755); err != nil {
// Failing to mkdir should not cause the test to fail
Byf("Error mkdir: %v", err)
Expand All @@ -189,18 +194,12 @@ func (acp *AzureClusterProxy) collectPodLogs(ctx context.Context, namespace stri
}
defer f.Close()

msg, ok := eventMsgs[pod.Name]
if !ok {
Byf("failed to find events of Pod %q", pod.Name)
return
}

out := bufio.NewWriter(f)
defer out.Flush()
_, err = out.WriteString(msg)
_, err = out.WriteString(eventMsgs)
if err != nil && err != io.ErrUnexpectedEOF {
// Failing to collect event message should not cause the test to fail
Byf("failed to collect event message of pod %s/%s: %v", kubesystem, pod.Name, err)
Byf("failed to collect event message of pod %s/%s: %v", podNamespace, pod.Name, err)
}
}(pod)
}
Expand Down

0 comments on commit 7f90d4b

Please sign in to comment.