Skip to content

Commit

Permalink
Merge pull request #7039 from Nordix/refactor-watch-deployment-logs/adil
Browse files Browse the repository at this point in the history
🌱Added WatchDeploymentLogsByLabelSelector function
  • Loading branch information
k8s-ci-robot authored Jan 10, 2023
2 parents 3806aaa + 3c636ea commit 281297c
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 32 deletions.
1 change: 1 addition & 0 deletions docs/book/src/developer/providers/v1.3-to-v1.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ maintainers of providers and consumers of our Go API.
### Other

- `clusterctl upgrade apply` no longer requires a namespace when updating providers. It is now optional and in a future release it will be deprecated. The new syntax is `[namespace/]provider:version`.
- `WatchDeploymentLogs` is changed to `WatchDeploymentLogsByName`, it works same as before. Another function `WatchDeploymentLogsByLabelSelector` is added to stream logs of deployment by label selector.

### Suggested changes for providers
4 changes: 2 additions & 2 deletions test/framework/clusterctl/clusterctl_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func InitManagementClusterAndWatchControllerLogs(ctx context.Context, input Init
}, intervals...)

// Start streaming logs from all controller providers
framework.WatchDeploymentLogs(ctx, framework.WatchDeploymentLogsInput{
framework.WatchDeploymentLogsByName(ctx, framework.WatchDeploymentLogsByNameInput{
GetLister: client,
ClientSet: input.ClusterProxy.GetClientSet(),
Deployment: deployment,
Expand Down Expand Up @@ -188,7 +188,7 @@ func UpgradeManagementClusterAndWait(ctx context.Context, input UpgradeManagemen
}, intervals...)

// Start streaming logs from all controller providers
framework.WatchDeploymentLogs(ctx, framework.WatchDeploymentLogsInput{
framework.WatchDeploymentLogsByName(ctx, framework.WatchDeploymentLogsByNameInput{
GetLister: client,
ClientSet: input.ClusterProxy.GetClientSet(),
Deployment: deployment,
Expand Down
119 changes: 89 additions & 30 deletions test/framework/deployment_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,34 +95,56 @@ func DescribeFailedDeployment(input WaitForDeploymentsAvailableInput, deployment
return b.String()
}

// WatchDeploymentLogsInput is the input for WatchDeploymentLogs.
type WatchDeploymentLogsInput struct {
// WatchDeploymentLogsByLabelSelectorInput is the input for WatchDeploymentLogsByLabelSelector.
type WatchDeploymentLogsByLabelSelectorInput struct {
GetLister GetLister
ClientSet *kubernetes.Clientset
Labels map[string]string
LogPath string
}

// WatchDeploymentLogsByLabelSelector streams logs for all containers for all pods belonging to a deployment on the basis of label. Each container's logs are streamed
// in a separate goroutine so they can all be streamed concurrently. This only causes a test failure if there are errors
// retrieving the deployment, its pods, or setting up a log file. If there is an error with the log streaming itself,
// that does not cause the test to fail.
func WatchDeploymentLogsByLabelSelector(ctx context.Context, input WatchDeploymentLogsByLabelSelectorInput) {
Expect(ctx).NotTo(BeNil(), "ctx is required for WatchDeploymentLogsByLabelSelector")
Expect(input.ClientSet).NotTo(BeNil(), "input.ClientSet is required for WatchDeploymentLogsByLabelSelector")
Expect(input.Labels).NotTo(BeNil(), "input.Selector is required for WatchDeploymentLogsByLabelSelector")

deploymentList := &appsv1.DeploymentList{}
Eventually(func() error {
return input.GetLister.List(ctx, deploymentList, client.MatchingLabels(input.Labels))
}, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to get deployment for labels")

for _, deployment := range deploymentList.Items {
watchPodLogs(ctx, watchPodLogsInput{
GetLister: input.GetLister,
ClientSet: input.ClientSet,
Namespace: deployment.Namespace,
DeploymentName: deployment.Name,
Labels: deployment.Spec.Selector.MatchLabels,
LogPath: input.LogPath,
})
}
}

// WatchDeploymentLogsByNameInput is the input for WatchDeploymentLogsByName.
type WatchDeploymentLogsByNameInput struct {
GetLister GetLister
ClientSet *kubernetes.Clientset
Deployment *appsv1.Deployment
LogPath string
}

// logMetadata contains metadata about the logs.
// The format is very similar to the one used by promtail.
type logMetadata struct {
Job string `json:"job"`
Namespace string `json:"namespace"`
App string `json:"app"`
Pod string `json:"pod"`
Container string `json:"container"`
NodeName string `json:"node_name"`
Stream string `json:"stream"`
}

// WatchDeploymentLogs streams logs for all containers for all pods belonging to a deployment. Each container's logs are streamed
// WatchDeploymentLogsByName streams logs for all containers for all pods belonging to a deployment. Each container's logs are streamed
// in a separate goroutine so they can all be streamed concurrently. This only causes a test failure if there are errors
// retrieving the deployment, its pods, or setting up a log file. If there is an error with the log streaming itself,
// that does not cause the test to fail.
func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) {
Expect(ctx).NotTo(BeNil(), "ctx is required for WatchControllerLogs")
Expect(input.ClientSet).NotTo(BeNil(), "input.ClientSet is required for WatchControllerLogs")
Expect(input.Deployment).NotTo(BeNil(), "input.Deployment is required for WatchControllerLogs")
func WatchDeploymentLogsByName(ctx context.Context, input WatchDeploymentLogsByNameInput) {
Expect(ctx).NotTo(BeNil(), "ctx is required for WatchDeploymentLogsByName")
Expect(input.ClientSet).NotTo(BeNil(), "input.ClientSet is required for WatchDeploymentLogsByName")
Expect(input.Deployment).NotTo(BeNil(), "input.Deployment is required for WatchDeploymentLogsByName")

deployment := &appsv1.Deployment{}
key := client.ObjectKeyFromObject(input.Deployment)
Expand All @@ -131,23 +153,47 @@ func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) {
}, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to get deployment %s", klog.KObj(input.Deployment))

selector, err := metav1.LabelSelectorAsMap(deployment.Spec.Selector)
Expect(err).NotTo(HaveOccurred(), "Failed to Pods selector for deployment %s", klog.KObj(input.Deployment))
Expect(err).NotTo(HaveOccurred(), "Failed to create Pods selector for deployment %s", klog.KObj(input.Deployment))
watchPodLogs(ctx, watchPodLogsInput{
GetLister: input.GetLister,
ClientSet: input.ClientSet,
Namespace: deployment.Namespace,
DeploymentName: deployment.Name,
Labels: selector,
LogPath: input.LogPath,
})
}

// watchPodLogsInput is the input for watchPodLogs.
type watchPodLogsInput struct {
GetLister GetLister
ClientSet *kubernetes.Clientset
Namespace string
DeploymentName string
Labels map[string]string
LogPath string
}

// watchPodLogs streams logs for all containers for all pods belonging to a deployment with the given label. Each container's logs are streamed
// in a separate goroutine so they can all be streamed concurrently. This only causes a test failure if there are errors
// retrieving the deployment, its pods, or setting up a log file. If there is an error with the log streaming itself,
// that does not cause the test to fail.
func watchPodLogs(ctx context.Context, input watchPodLogsInput) {
pods := &corev1.PodList{}
Expect(input.GetLister.List(ctx, pods, client.InNamespace(input.Deployment.Namespace), client.MatchingLabels(selector))).To(Succeed(), "Failed to list Pods for deployment %s", klog.KObj(input.Deployment))
Expect(input.GetLister.List(ctx, pods, client.InNamespace(input.Namespace), client.MatchingLabels(input.Labels))).To(Succeed(), "Failed to list Pods for deployment %s", klog.KRef(input.Namespace, input.DeploymentName))

for _, pod := range pods.Items {
for _, container := range deployment.Spec.Template.Spec.Containers {
log.Logf("Creating log watcher for controller %s, pod %s, container %s", klog.KObj(input.Deployment), pod.Name, container.Name)
for _, container := range pod.Spec.Containers {
log.Logf("Creating log watcher for controller %s, pod %s, container %s", klog.KRef(input.Namespace, input.DeploymentName), pod.Name, container.Name)

// Create log metadata file.
logMetadataFile := filepath.Clean(path.Join(input.LogPath, input.Deployment.Name, pod.Name, container.Name+"-log-metadata.json"))
logMetadataFile := filepath.Clean(path.Join(input.LogPath, input.DeploymentName, pod.Name, container.Name+"-log-metadata.json"))
Expect(os.MkdirAll(filepath.Dir(logMetadataFile), 0750)).To(Succeed())

metadata := logMetadata{
Job: input.Deployment.Namespace + "/" + input.Deployment.Name,
Namespace: input.Deployment.Namespace,
App: input.Deployment.Name,
Job: input.Namespace + "/" + input.DeploymentName,
Namespace: input.Namespace,
App: input.DeploymentName,
Pod: pod.Name,
Container: container.Name,
NodeName: pod.Spec.NodeName,
Expand All @@ -161,7 +207,7 @@ func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) {
go func(pod corev1.Pod, container corev1.Container) {
defer GinkgoRecover()

logFile := filepath.Clean(path.Join(input.LogPath, input.Deployment.Name, pod.Name, container.Name+".log"))
logFile := filepath.Clean(path.Join(input.LogPath, input.DeploymentName, pod.Name, container.Name+".log"))
Expect(os.MkdirAll(filepath.Dir(logFile), 0750)).To(Succeed())

f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
Expand All @@ -173,7 +219,7 @@ func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) {
Follow: true,
}

podLogs, err := input.ClientSet.CoreV1().Pods(input.Deployment.Namespace).GetLogs(pod.Name, opts).Stream(ctx)
podLogs, err := input.ClientSet.CoreV1().Pods(input.Namespace).GetLogs(pod.Name, opts).Stream(ctx)
if err != nil {
// Failing to stream logs should not cause the test to fail
log.Logf("Error starting logs stream for pod %s, container %s: %v", klog.KRef(pod.Namespace, pod.Name), container.Name, err)
Expand All @@ -193,6 +239,19 @@ func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) {
}
}

// logMetadata contains metadata about the logs.
// The format is very similar to the one used by promtail.
type logMetadata struct {
Job string `json:"job"`
Namespace string `json:"namespace"`
App string `json:"app"`
Pod string `json:"pod"`
Container string `json:"container"`
NodeName string `json:"node_name"`
Stream string `json:"stream"`
Labels map[string]string `json:"labels,omitempty"`
}

type WatchPodMetricsInput struct {
GetLister GetLister
ClientSet *kubernetes.Clientset
Expand All @@ -215,7 +274,7 @@ func WatchPodMetrics(ctx context.Context, input WatchPodMetricsInput) {
}, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to get deployment %s", klog.KObj(input.Deployment))

selector, err := metav1.LabelSelectorAsMap(deployment.Spec.Selector)
Expect(err).NotTo(HaveOccurred(), "Failed to Pods selector for deployment %s", klog.KObj(input.Deployment))
Expect(err).NotTo(HaveOccurred(), "Failed to create Pods selector for deployment %s", klog.KObj(input.Deployment))

pods := &corev1.PodList{}
Eventually(func() error {
Expand Down

0 comments on commit 281297c

Please sign in to comment.