From 0698b56b4857ca44dbf746b8e28e8ca07f88af81 Mon Sep 17 00:00:00 2001 From: Marco Hofstetter Date: Wed, 24 Apr 2024 20:32:52 +0200 Subject: [PATCH] k8s: remove unused filter functionality from k8sclient.CiliumLogs Currently, the logic to retrieve all logs from Cilium Pods is implemented using `bufio.Scanner`. This might result in errors for long loglines. ``` Error reading Cilium logs: error reading cilium-agent logs for kube-system/cilium-c2sx7: bufio.Scanner: token too long ``` The only reason to use the `bufio.Scanner` is the support to filter the logs by a given regular expression. But this functionality is not used. Therefore, this commit removes the filter functionality and the usage of `bufio.Scanner`. Signed-off-by: Marco Hofstetter --- clustermesh/clustermesh.go | 3 +- connectivity/check/policy.go | 2 +- k8s/client.go | 56 +++++++----------------------------- status/k8s.go | 11 ++++--- status/k8s_test.go | 3 +- 5 files changed, 18 insertions(+), 57 deletions(-) diff --git a/clustermesh/clustermesh.go b/clustermesh/clustermesh.go index a0961f62ce..dff551b0da 100644 --- a/clustermesh/clustermesh.go +++ b/clustermesh/clustermesh.go @@ -15,7 +15,6 @@ import ( "io" "net" "os" - "regexp" "sort" "strconv" "strings" @@ -75,7 +74,7 @@ type k8sClusterMeshImplementation interface { CreateCiliumExternalWorkload(ctx context.Context, cew *ciliumv2.CiliumExternalWorkload, opts metav1.CreateOptions) (*ciliumv2.CiliumExternalWorkload, error) DeleteCiliumExternalWorkload(ctx context.Context, name string, opts metav1.DeleteOptions) error ListCiliumEndpoints(ctx context.Context, namespace string, options metav1.ListOptions) (*ciliumv2.CiliumEndpointList, error) - CiliumLogs(ctx context.Context, namespace, pod string, since time.Time, filter *regexp.Regexp) (string, error) + CiliumLogs(ctx context.Context, namespace, pod string, since time.Time) (string, error) } type K8sClusterMesh struct { diff --git a/connectivity/check/policy.go b/connectivity/check/policy.go index 9a4cc04fd2..43f169b661 100644 --- a/connectivity/check/policy.go +++ b/connectivity/check/policy.go @@ -657,7 +657,7 @@ func (t *Test) deletePolicies(ctx context.Context) error { // filter is applied on each line of output. func (t *Test) CiliumLogs(ctx context.Context) { for _, pod := range t.Context().ciliumPods { - log, err := pod.K8sClient.CiliumLogs(ctx, pod.Pod.Namespace, pod.Pod.Name, t.startTime, nil) + log, err := pod.K8sClient.CiliumLogs(ctx, pod.Pod.Namespace, pod.Pod.Name, t.startTime) if err != nil { t.Fatalf("Error reading Cilium logs: %s", err) } diff --git a/k8s/client.go b/k8s/client.go index edee0cfbb3..357dc5c691 100644 --- a/k8s/client.go +++ b/k8s/client.go @@ -4,7 +4,6 @@ package k8s import ( - "bufio" "bytes" "context" "encoding/json" @@ -12,11 +11,15 @@ import ( "io" "net" "net/url" - "regexp" "strings" "time" "github.com/blang/semver/v4" + "github.com/cilium/cilium/api/v1/models" + ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + ciliumv2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1" + ciliumClientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned" + "github.com/cilium/cilium/pkg/versioncheck" "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/cli/output" appsv1 "k8s.io/api/apps/v1" @@ -41,12 +44,6 @@ import ( "k8s.io/client-go/rest" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - "github.com/cilium/cilium/api/v1/models" - ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" - ciliumv2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1" - ciliumClientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned" - "github.com/cilium/cilium/pkg/versioncheck" - "github.com/cilium/cilium-cli/defaults" ) @@ -282,7 +279,6 @@ func (c *Client) GetRaw(ctx context.Context, path string) (string, error) { return "", err } return string(response), nil - } func (c *Client) CreatePod(ctx context.Context, namespace string, pod *corev1.Pod, opts metav1.CreateOptions) (*corev1.Pod, error) { @@ -305,12 +301,7 @@ func (c *Client) PodLogs(namespace, name string, opts *corev1.PodLogOptions) *re return c.Clientset.CoreV1().Pods(namespace).GetLogs(name, opts) } -// separator for locating the start of the next log message. Sometimes -// logs may span multiple lines, locate the timestamp, log level and -// msg that always start a new log message -var logSplitter = regexp.MustCompile(`\r?\n[^ ]+ level=[[:alpha:]]+ msg=`) - -func (c *Client) CiliumLogs(ctx context.Context, namespace, pod string, since time.Time, filter *regexp.Regexp) (string, error) { +func (c *Client) CiliumLogs(ctx context.Context, namespace, pod string, since time.Time) (string, error) { opts := &corev1.PodLogOptions{ Container: defaults.AgentContainerName, Timestamps: true, @@ -323,37 +314,12 @@ func (c *Client) CiliumLogs(ctx context.Context, namespace, pod string, since ti } defer podLogs.Close() - buf := new(bytes.Buffer) - scanner := bufio.NewScanner(podLogs) - scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) == 0 { - return 0, nil, nil - } - // find the full log line separator - loc := logSplitter.FindIndex(data) - if loc != nil { - // Locate '\n', advance just past it - nl := loc[0] + bytes.IndexByte(data[loc[0]:loc[1]], '\n') + 1 - return nl, data[:nl], nil - } else if atEOF { - // EOF, return all we have - return len(data), data, nil - } - // Nothing to return - return 0, nil, nil - }) - - for scanner.Scan() { - if filter != nil && !filter.Match(scanner.Bytes()) { - continue - } - buf.Write(scanner.Bytes()) - } - err = scanner.Err() + log, err := io.ReadAll(podLogs) if err != nil { - err = fmt.Errorf("error reading cilium-agent logs for %s/%s: %w", namespace, pod, err) + return "", fmt.Errorf("error reading log: %w", err) } - return buf.String(), err + + return string(log), nil } func (c *Client) ListServices(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.ServiceList, error) { @@ -916,7 +882,6 @@ func (c *Client) GetHelmValues(_ context.Context, releaseName string, namespace return "", fmt.Errorf("unable to parse helm values from release %s: %w", releaseName, err) } return valuesBuf.String(), nil - } // GetHelmMetadata is the function for cilium cli sysdump to collect the helm metadata from the release directly @@ -942,7 +907,6 @@ func (c *Client) GetHelmMetadata(_ context.Context, releaseName string, namespac return "", fmt.Errorf("unable to parse helm metas from release %s: %w", releaseName, err) } return buf.String(), nil - } // CreateEphemeralContainer will create a EphemeralContainer (debug container) in the specified pod. diff --git a/status/k8s.go b/status/k8s.go index 69626f9bfe..01b63fba0a 100644 --- a/status/k8s.go +++ b/status/k8s.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "net/http" - "regexp" "strings" "sync" "time" @@ -70,7 +69,7 @@ type k8sImplementation interface { GetDeployment(ctx context.Context, namespace, name string, options metav1.GetOptions) (*appsv1.Deployment, error) ListPods(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.PodList, error) ListCiliumEndpoints(ctx context.Context, namespace string, options metav1.ListOptions) (*ciliumv2.CiliumEndpointList, error) - CiliumLogs(ctx context.Context, namespace, pod string, since time.Time, filter *regexp.Regexp) (string, error) + CiliumLogs(ctx context.Context, namespace, pod string, since time.Time) (string, error) } func NewK8sStatusCollector(client k8sImplementation, params K8sStatusParameters) (*K8sStatusCollector, error) { @@ -544,10 +543,11 @@ func (k *K8sStatusCollector) status(ctx context.Context) *Status { } status.HelmChartVersion = release.Chart.Metadata.Version return nil - }}) + }, + }) // for the sake of sanity, don't get pod logs more than once - var agentLogsOnce = sync.Once{} + agentLogsOnce := sync.Once{} err := k.podStatus(ctx, status, defaults.AgentDaemonSetName, defaults.AgentPodSelector, func(_ context.Context, status *Status, name string, pod *corev1.Pod) { if pod.Status.Phase == corev1.PodRunning { // extract container status @@ -591,7 +591,7 @@ func (k *K8sStatusCollector) status(ctx context.Context) *Status { dyingGasp = strings.TrimSpace(terminated.Message) } else { agentLogsOnce.Do(func() { // in a sync.Once so we don't waste time retrieving lots of logs - logs, err := k.client.CiliumLogs(ctx, pod.Namespace, pod.Name, terminated.FinishedAt.Time.Add(-2*time.Minute), nil) + logs, err := k.client.CiliumLogs(ctx, pod.Namespace, pod.Name, terminated.FinishedAt.Time.Add(-2*time.Minute)) if err == nil && logs != "" { dyingGasp = strings.TrimSpace(logs) } @@ -621,7 +621,6 @@ func (k *K8sStatusCollector) status(ctx context.Context) *Status { }) } }) - if err != nil { status.CollectionError(err) } diff --git a/status/k8s_test.go b/status/k8s_test.go index 01e679dd28..0b66a98998 100644 --- a/status/k8s_test.go +++ b/status/k8s_test.go @@ -6,7 +6,6 @@ package status import ( "context" "fmt" - "regexp" "testing" "time" @@ -144,7 +143,7 @@ func (c *k8sStatusMockClient) ListCiliumEndpoints(_ context.Context, _ string, o return c.ciliumEndpointList[options.LabelSelector], nil } -func (c *k8sStatusMockClient) CiliumLogs(_ context.Context, _, _ string, _ time.Time, _ *regexp.Regexp) (string, error) { +func (c *k8sStatusMockClient) CiliumLogs(_ context.Context, _, _ string, _ time.Time) (string, error) { return "[error] a sample cilium-agent error message", nil }