Skip to content

Commit

Permalink
k8s: remove unused filter functionality from k8sclient.CiliumLogs
Browse files Browse the repository at this point in the history
Currently, the logic to retrieve all logs from Cilium Pods is implemented using
`bufio.Scanner`. This might result in errors for long  loglines.

```
Error reading Cilium logs: error reading cilium-agent logs for kube-system/cilium-c2sx7: bufio.Scanner: token too long
```

The only reason to use the `bufio.Scanner` is the support to filter the logs by
a given regular expression. But this functionality is not used.

Therefore, this commit removes the filter functionality and the usage of `bufio.Scanner`.

Signed-off-by: Marco Hofstetter <[email protected]>
  • Loading branch information
mhofstetter committed Apr 24, 2024
1 parent c29afa4 commit 052a903
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 57 deletions.
3 changes: 1 addition & 2 deletions clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"io"
"net"
"os"
"regexp"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -75,7 +74,7 @@ type k8sClusterMeshImplementation interface {
CreateCiliumExternalWorkload(ctx context.Context, cew *ciliumv2.CiliumExternalWorkload, opts metav1.CreateOptions) (*ciliumv2.CiliumExternalWorkload, error)
DeleteCiliumExternalWorkload(ctx context.Context, name string, opts metav1.DeleteOptions) error
ListCiliumEndpoints(ctx context.Context, namespace string, options metav1.ListOptions) (*ciliumv2.CiliumEndpointList, error)
CiliumLogs(ctx context.Context, namespace, pod string, since time.Time, filter *regexp.Regexp) (string, error)
CiliumLogs(ctx context.Context, namespace, pod string, since time.Time) (string, error)
}

type K8sClusterMesh struct {
Expand Down
2 changes: 1 addition & 1 deletion connectivity/check/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (t *Test) deletePolicies(ctx context.Context) error {
// filter is applied on each line of output.
func (t *Test) CiliumLogs(ctx context.Context) {
for _, pod := range t.Context().ciliumPods {
log, err := pod.K8sClient.CiliumLogs(ctx, pod.Pod.Namespace, pod.Pod.Name, t.startTime, nil)
log, err := pod.K8sClient.CiliumLogs(ctx, pod.Pod.Namespace, pod.Pod.Name, t.startTime)
if err != nil {
t.Fatalf("Error reading Cilium logs: %s", err)
}
Expand Down
56 changes: 10 additions & 46 deletions k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@
package k8s

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/url"
"regexp"
"strings"
"time"

"github.com/blang/semver/v4"
"github.com/cilium/cilium/api/v1/models"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
ciliumv2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
ciliumClientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/versioncheck"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/cli/output"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -41,12 +44,6 @@ import (
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

"github.com/cilium/cilium/api/v1/models"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
ciliumv2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
ciliumClientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/versioncheck"

"github.com/cilium/cilium-cli/defaults"
)

Expand Down Expand Up @@ -282,7 +279,6 @@ func (c *Client) GetRaw(ctx context.Context, path string) (string, error) {
return "", err
}
return string(response), nil

}

func (c *Client) CreatePod(ctx context.Context, namespace string, pod *corev1.Pod, opts metav1.CreateOptions) (*corev1.Pod, error) {
Expand All @@ -305,12 +301,7 @@ func (c *Client) PodLogs(namespace, name string, opts *corev1.PodLogOptions) *re
return c.Clientset.CoreV1().Pods(namespace).GetLogs(name, opts)
}

// separator for locating the start of the next log message. Sometimes
// logs may span multiple lines, locate the timestamp, log level and
// msg that always start a new log message
var logSplitter = regexp.MustCompile(`\r?\n[^ ]+ level=[[:alpha:]]+ msg=`)

func (c *Client) CiliumLogs(ctx context.Context, namespace, pod string, since time.Time, filter *regexp.Regexp) (string, error) {
func (c *Client) CiliumLogs(ctx context.Context, namespace, pod string, since time.Time) (string, error) {
opts := &corev1.PodLogOptions{
Container: defaults.AgentContainerName,
Timestamps: true,
Expand All @@ -323,37 +314,12 @@ func (c *Client) CiliumLogs(ctx context.Context, namespace, pod string, since ti
}
defer podLogs.Close()

buf := new(bytes.Buffer)
scanner := bufio.NewScanner(podLogs)
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
// find the full log line separator
loc := logSplitter.FindIndex(data)
if loc != nil {
// Locate '\n', advance just past it
nl := loc[0] + bytes.IndexByte(data[loc[0]:loc[1]], '\n') + 1
return nl, data[:nl], nil
} else if atEOF {
// EOF, return all we have
return len(data), data, nil
}
// Nothing to return
return 0, nil, nil
})

for scanner.Scan() {
if filter != nil && !filter.Match(scanner.Bytes()) {
continue
}
buf.Write(scanner.Bytes())
}
err = scanner.Err()
log, err := io.ReadAll(podLogs)
if err != nil {
err = fmt.Errorf("error reading cilium-agent logs for %s/%s: %w", namespace, pod, err)
return "", fmt.Errorf("error reading log: %w", err)
}
return buf.String(), err

return string(log), err
}

func (c *Client) ListServices(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.ServiceList, error) {
Expand Down Expand Up @@ -916,7 +882,6 @@ func (c *Client) GetHelmValues(_ context.Context, releaseName string, namespace
return "", fmt.Errorf("unable to parse helm values from release %s: %w", releaseName, err)
}
return valuesBuf.String(), nil

}

// GetHelmMetadata is the function for cilium cli sysdump to collect the helm metadata from the release directly
Expand All @@ -942,7 +907,6 @@ func (c *Client) GetHelmMetadata(_ context.Context, releaseName string, namespac
return "", fmt.Errorf("unable to parse helm metas from release %s: %w", releaseName, err)
}
return buf.String(), nil

}

// CreateEphemeralContainer will create a EphemeralContainer (debug container) in the specified pod.
Expand Down
11 changes: 5 additions & 6 deletions status/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -70,7 +69,7 @@ type k8sImplementation interface {
GetDeployment(ctx context.Context, namespace, name string, options metav1.GetOptions) (*appsv1.Deployment, error)
ListPods(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.PodList, error)
ListCiliumEndpoints(ctx context.Context, namespace string, options metav1.ListOptions) (*ciliumv2.CiliumEndpointList, error)
CiliumLogs(ctx context.Context, namespace, pod string, since time.Time, filter *regexp.Regexp) (string, error)
CiliumLogs(ctx context.Context, namespace, pod string, since time.Time) (string, error)
}

func NewK8sStatusCollector(client k8sImplementation, params K8sStatusParameters) (*K8sStatusCollector, error) {
Expand Down Expand Up @@ -544,10 +543,11 @@ func (k *K8sStatusCollector) status(ctx context.Context) *Status {
}
status.HelmChartVersion = release.Chart.Metadata.Version
return nil
}})
},
})

// for the sake of sanity, don't get pod logs more than once
var agentLogsOnce = sync.Once{}
agentLogsOnce := sync.Once{}
err := k.podStatus(ctx, status, defaults.AgentDaemonSetName, defaults.AgentPodSelector, func(_ context.Context, status *Status, name string, pod *corev1.Pod) {
if pod.Status.Phase == corev1.PodRunning {
// extract container status
Expand Down Expand Up @@ -591,7 +591,7 @@ func (k *K8sStatusCollector) status(ctx context.Context) *Status {
dyingGasp = strings.TrimSpace(terminated.Message)
} else {
agentLogsOnce.Do(func() { // in a sync.Once so we don't waste time retrieving lots of logs
logs, err := k.client.CiliumLogs(ctx, pod.Namespace, pod.Name, terminated.FinishedAt.Time.Add(-2*time.Minute), nil)
logs, err := k.client.CiliumLogs(ctx, pod.Namespace, pod.Name, terminated.FinishedAt.Time.Add(-2*time.Minute))
if err == nil && logs != "" {
dyingGasp = strings.TrimSpace(logs)
}
Expand Down Expand Up @@ -621,7 +621,6 @@ func (k *K8sStatusCollector) status(ctx context.Context) *Status {
})
}
})

if err != nil {
status.CollectionError(err)
}
Expand Down
3 changes: 1 addition & 2 deletions status/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package status
import (
"context"
"fmt"
"regexp"
"testing"
"time"

Expand Down Expand Up @@ -144,7 +143,7 @@ func (c *k8sStatusMockClient) ListCiliumEndpoints(_ context.Context, _ string, o
return c.ciliumEndpointList[options.LabelSelector], nil
}

func (c *k8sStatusMockClient) CiliumLogs(_ context.Context, _, _ string, _ time.Time, _ *regexp.Regexp) (string, error) {
func (c *k8sStatusMockClient) CiliumLogs(_ context.Context, _, _ string, _ time.Time) (string, error) {
return "[error] a sample cilium-agent error message", nil
}

Expand Down

0 comments on commit 052a903

Please sign in to comment.