Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s: remove unused filter functionality from k8sclient.CiliumLogs #2501

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"io"
"net"
"os"
"regexp"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -75,7 +74,7 @@ type k8sClusterMeshImplementation interface {
CreateCiliumExternalWorkload(ctx context.Context, cew *ciliumv2.CiliumExternalWorkload, opts metav1.CreateOptions) (*ciliumv2.CiliumExternalWorkload, error)
DeleteCiliumExternalWorkload(ctx context.Context, name string, opts metav1.DeleteOptions) error
ListCiliumEndpoints(ctx context.Context, namespace string, options metav1.ListOptions) (*ciliumv2.CiliumEndpointList, error)
CiliumLogs(ctx context.Context, namespace, pod string, since time.Time, filter *regexp.Regexp) (string, error)
CiliumLogs(ctx context.Context, namespace, pod string, since time.Time) (string, error)
}

type K8sClusterMesh struct {
Expand Down
2 changes: 1 addition & 1 deletion connectivity/check/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (t *Test) deletePolicies(ctx context.Context) error {
// filter is applied on each line of output.
func (t *Test) CiliumLogs(ctx context.Context) {
for _, pod := range t.Context().ciliumPods {
log, err := pod.K8sClient.CiliumLogs(ctx, pod.Pod.Namespace, pod.Pod.Name, t.startTime, nil)
log, err := pod.K8sClient.CiliumLogs(ctx, pod.Pod.Namespace, pod.Pod.Name, t.startTime)
if err != nil {
t.Fatalf("Error reading Cilium logs: %s", err)
}
Expand Down
56 changes: 10 additions & 46 deletions k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@
package k8s

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/url"
"regexp"
"strings"
"time"

"github.com/blang/semver/v4"
"github.com/cilium/cilium/api/v1/models"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
ciliumv2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
ciliumClientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/versioncheck"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/cli/output"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -41,12 +44,6 @@ import (
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

"github.com/cilium/cilium/api/v1/models"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
ciliumv2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
ciliumClientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/versioncheck"

"github.com/cilium/cilium-cli/defaults"
)

Expand Down Expand Up @@ -282,7 +279,6 @@ func (c *Client) GetRaw(ctx context.Context, path string) (string, error) {
return "", err
}
return string(response), nil

}

func (c *Client) CreatePod(ctx context.Context, namespace string, pod *corev1.Pod, opts metav1.CreateOptions) (*corev1.Pod, error) {
Expand All @@ -305,12 +301,7 @@ func (c *Client) PodLogs(namespace, name string, opts *corev1.PodLogOptions) *re
return c.Clientset.CoreV1().Pods(namespace).GetLogs(name, opts)
}

// separator for locating the start of the next log message. Sometimes
// logs may span multiple lines, locate the timestamp, log level and
// msg that always start a new log message
var logSplitter = regexp.MustCompile(`\r?\n[^ ]+ level=[[:alpha:]]+ msg=`)

func (c *Client) CiliumLogs(ctx context.Context, namespace, pod string, since time.Time, filter *regexp.Regexp) (string, error) {
func (c *Client) CiliumLogs(ctx context.Context, namespace, pod string, since time.Time) (string, error) {
opts := &corev1.PodLogOptions{
Container: defaults.AgentContainerName,
Timestamps: true,
Expand All @@ -323,37 +314,12 @@ func (c *Client) CiliumLogs(ctx context.Context, namespace, pod string, since ti
}
defer podLogs.Close()

buf := new(bytes.Buffer)
scanner := bufio.NewScanner(podLogs)
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
// find the full log line separator
loc := logSplitter.FindIndex(data)
if loc != nil {
// Locate '\n', advance just past it
nl := loc[0] + bytes.IndexByte(data[loc[0]:loc[1]], '\n') + 1
return nl, data[:nl], nil
} else if atEOF {
// EOF, return all we have
return len(data), data, nil
}
// Nothing to return
return 0, nil, nil
})

for scanner.Scan() {
if filter != nil && !filter.Match(scanner.Bytes()) {
continue
}
buf.Write(scanner.Bytes())
}
err = scanner.Err()
log, err := io.ReadAll(podLogs)
if err != nil {
err = fmt.Errorf("error reading cilium-agent logs for %s/%s: %w", namespace, pod, err)
return "", fmt.Errorf("error reading log: %w", err)
}
return buf.String(), err

return string(log), nil
}

func (c *Client) ListServices(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.ServiceList, error) {
Expand Down Expand Up @@ -916,7 +882,6 @@ func (c *Client) GetHelmValues(_ context.Context, releaseName string, namespace
return "", fmt.Errorf("unable to parse helm values from release %s: %w", releaseName, err)
}
return valuesBuf.String(), nil

}

// GetHelmMetadata is the function for cilium cli sysdump to collect the helm metadata from the release directly
Expand All @@ -942,7 +907,6 @@ func (c *Client) GetHelmMetadata(_ context.Context, releaseName string, namespac
return "", fmt.Errorf("unable to parse helm metas from release %s: %w", releaseName, err)
}
return buf.String(), nil

}

// CreateEphemeralContainer will create a EphemeralContainer (debug container) in the specified pod.
Expand Down
11 changes: 5 additions & 6 deletions status/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -70,7 +69,7 @@ type k8sImplementation interface {
GetDeployment(ctx context.Context, namespace, name string, options metav1.GetOptions) (*appsv1.Deployment, error)
ListPods(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.PodList, error)
ListCiliumEndpoints(ctx context.Context, namespace string, options metav1.ListOptions) (*ciliumv2.CiliumEndpointList, error)
CiliumLogs(ctx context.Context, namespace, pod string, since time.Time, filter *regexp.Regexp) (string, error)
CiliumLogs(ctx context.Context, namespace, pod string, since time.Time) (string, error)
}

func NewK8sStatusCollector(client k8sImplementation, params K8sStatusParameters) (*K8sStatusCollector, error) {
Expand Down Expand Up @@ -544,10 +543,11 @@ func (k *K8sStatusCollector) status(ctx context.Context) *Status {
}
status.HelmChartVersion = release.Chart.Metadata.Version
return nil
}})
},
})

// for the sake of sanity, don't get pod logs more than once
var agentLogsOnce = sync.Once{}
agentLogsOnce := sync.Once{}
err := k.podStatus(ctx, status, defaults.AgentDaemonSetName, defaults.AgentPodSelector, func(_ context.Context, status *Status, name string, pod *corev1.Pod) {
if pod.Status.Phase == corev1.PodRunning {
// extract container status
Expand Down Expand Up @@ -591,7 +591,7 @@ func (k *K8sStatusCollector) status(ctx context.Context) *Status {
dyingGasp = strings.TrimSpace(terminated.Message)
} else {
agentLogsOnce.Do(func() { // in a sync.Once so we don't waste time retrieving lots of logs
logs, err := k.client.CiliumLogs(ctx, pod.Namespace, pod.Name, terminated.FinishedAt.Time.Add(-2*time.Minute), nil)
logs, err := k.client.CiliumLogs(ctx, pod.Namespace, pod.Name, terminated.FinishedAt.Time.Add(-2*time.Minute))
if err == nil && logs != "" {
dyingGasp = strings.TrimSpace(logs)
}
Expand Down Expand Up @@ -621,7 +621,6 @@ func (k *K8sStatusCollector) status(ctx context.Context) *Status {
})
}
})

if err != nil {
status.CollectionError(err)
}
Expand Down
3 changes: 1 addition & 2 deletions status/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package status
import (
"context"
"fmt"
"regexp"
"testing"
"time"

Expand Down Expand Up @@ -144,7 +143,7 @@ func (c *k8sStatusMockClient) ListCiliumEndpoints(_ context.Context, _ string, o
return c.ciliumEndpointList[options.LabelSelector], nil
}

func (c *k8sStatusMockClient) CiliumLogs(_ context.Context, _, _ string, _ time.Time, _ *regexp.Regexp) (string, error) {
func (c *k8sStatusMockClient) CiliumLogs(_ context.Context, _, _ string, _ time.Time) (string, error) {
return "[error] a sample cilium-agent error message", nil
}

Expand Down
Loading