Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add profiling data(cpu/heap) of cilium agent to sysdump. #1424

Merged
merged 1 commit into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/cli/cmd/sysdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func initSysdumpFlags(cmd *cobra.Command, options *sysdump.Options, optionPrefix
cmd.Flags().BoolVar(&options.Debug,
optionPrefix+"debug", sysdump.DefaultDebug,
"Whether to enable debug logging")
cmd.Flags().BoolVar(&options.Profiling,
optionPrefix+"profiling", sysdump.DefaultProfiling,
"Whether to enable scraping profiling data")
cmd.Flags().StringArrayVar(&options.ExtraLabelSelectors,
optionPrefix+"extra-label-selectors", nil,
"Optional set of labels selectors used to target additional pods for log collection.")
Expand Down
4 changes: 4 additions & 0 deletions sysdump/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ var (
"stack",
"stats",
}
gopsProfiling = []string{
"pprof-heap",
"pprof-cpu",
}
)

var (
Expand Down
1 change: 1 addition & 0 deletions sysdump/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
DefaultCiliumOperatorLabelSelector = "io.cilium/app=operator"
DefaultClustermeshApiserverLabelSelector = labelPrefix + "clustermesh-apiserver"
DefaultDebug = false
DefaultProfiling = true
DefaultHubbleLabelSelector = labelPrefix + "hubble"
DefaultHubbleFlowsCount = 10000
DefaultHubbleFlowsTimeout = 5 * time.Second
Expand Down
114 changes: 99 additions & 15 deletions sysdump/sysdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Options struct {
ClustermeshApiserverLabelSelector string
// Whether to enable debug logging.
Debug bool
// Whether to enable scraping profiling data.
Profiling bool
// The labels used to target additional pods
ExtraLabelSelectors []string
// The labels used to target Hubble pods.
Expand Down Expand Up @@ -866,6 +868,26 @@ func (c *Collector) Run() error {
return nil
},
},
{
CreatesSubtasks: true,
Description: "Collecting profiling data from Cilium pods",
Quick: false,
Task: func(ctx context.Context) error {
if !c.Options.Profiling {
return nil
}
p, err := c.Client.ListPods(ctx, c.Options.CiliumNamespace, metav1.ListOptions{
LabelSelector: c.Options.CiliumLabelSelector,
})
if err != nil {
return fmt.Errorf("failed to get profiling from Cilium pods: %w", err)
}
if err := c.SubmitProfilingGopsSubtasks(ctx, FilterPods(p, c.NodeList), ciliumAgentContainerName); err != nil {
return fmt.Errorf("failed to collect profiling data from Cilium pods: %w", err)
}
return nil
},
},
{
CreatesSubtasks: true,
Description: "Collecting logs from Cilium pods",
Expand Down Expand Up @@ -1364,6 +1386,19 @@ func extractGopsPID(output string) (string, error) {
return "", fmt.Errorf("failed to extract pid from output: %q", output)
}

func extractGopsProfileData(output string) (string, error) {
splited := strings.Split(output, "\n")
prefix := "saved to: "

for _, str := range splited {
if strings.Contains(str, prefix) {
return strings.TrimSpace(strings.Split(str, prefix)[1]), nil
}
}
return "", fmt.Errorf("Unable to find output file: %s", output)

}

func (c *Collector) SubmitCniConflistSubtask(ctx context.Context, pods []*corev1.Pod, containerName string) error {
for _, p := range pods {
p := p
Expand Down Expand Up @@ -1397,28 +1432,36 @@ func (c *Collector) SubmitCniConflistSubtask(ctx context.Context, pods []*corev1
return nil
}

// SubmitGopsSubtasks submits tasks to collect kubernetes logs from pods.
func (c *Collector) getGopsPID(ctx context.Context, pod *corev1.Pod, containerName string) (string, error) {
// Run 'gops' on the pod.
gopsOutput, err := c.Client.ExecInPod(ctx, pod.Namespace, pod.Name, containerName, []string{
gopsCommand,
})
if err != nil {
return "", fmt.Errorf("failed to list processes %q (%q) in namespace %q: %w", pod.Name, containerName, pod.Namespace, err)
}
agentPID := gopsPID
if c.Options.DetectGopsPID {
var err error
outputStr := gopsOutput.String()
agentPID, err = extractGopsPID(outputStr)
if err != nil {
return "", err
}
}
return agentPID, nil
}

// SubmitGopsSubtasks submits tasks to collect gops statistics from pods.
func (c *Collector) SubmitGopsSubtasks(ctx context.Context, pods []*corev1.Pod, containerName string) error {
for _, p := range pods {
p := p
for _, g := range gopsStats {
g := g
if err := c.Pool.Submit(fmt.Sprintf("gops-%s-%s", p.Name, g), func(ctx context.Context) error {
// Run 'gops' on the pod.
gopsOutput, err := c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{
gopsCommand,
})
agentPID, err := c.getGopsPID(ctx, p, containerName)
if err != nil {
return fmt.Errorf("failed to list processes %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err)
}
agentPID := gopsPID
if c.Options.DetectGopsPID {
var err error
outputStr := gopsOutput.String()
agentPID, err = extractGopsPID(outputStr)
if err != nil {
return err
}
return err
}
o, err := c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{
gopsCommand,
Expand All @@ -1441,6 +1484,47 @@ func (c *Collector) SubmitGopsSubtasks(ctx context.Context, pods []*corev1.Pod,
return nil
}

// SubmitProfilingGopsSubtasks submits tasks to collect profiling data from pods.
func (c *Collector) SubmitProfilingGopsSubtasks(ctx context.Context, pods []*corev1.Pod, containerName string) error {
for _, p := range pods {
p := p
for _, g := range gopsProfiling {
g := g
if err := c.Pool.Submit(fmt.Sprintf("gops-%s-%s", p.Name, g), func(ctx context.Context) error {
agentPID, err := c.getGopsPID(ctx, p, containerName)
if err != nil {
return err
}
o, err := c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{
gopsCommand,
g,
agentPID,
})
if err != nil {
return fmt.Errorf("failed to collect gops profiling for %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err)
}
filePath, err := extractGopsProfileData(o.String())
if err != nil {
return fmt.Errorf("failed to collect gops profiling for %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err)
}
f := c.AbsoluteTempPath(fmt.Sprintf("%s-%s-<ts>.pprof", p.Name, g))
err = c.Client.CopyFromPod(ctx, p.Namespace, p.Name, containerName, filePath, f, c.Options.CopyRetryLimit)
if err != nil {
return fmt.Errorf("failed to collect gops profiling output for %q: %w", p.Name, err)
}
if _, err = c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{rmCommand, filePath}); err != nil {
c.logWarn("failed to delete profiling output from pod %q in namespace %q: %w", p.Name, p.Namespace, err)
return nil
}
return nil
}); err != nil {
return fmt.Errorf("failed to submit %s gops task for %q: %w", g, p.Name, err)
}
}
}
return nil
}

// SubmitLogsTasks submits tasks to collect kubernetes logs from pods.
func (c *Collector) SubmitLogsTasks(ctx context.Context, pods []*corev1.Pod, since time.Duration, limitBytes int64) error {
t := time.Now().Add(-since)
Expand Down
13 changes: 13 additions & 0 deletions sysdump/sysdump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ func (b *SysdumpSuite) TestExtractGopsPID(c *check.C) {

}

func (b *SysdumpSuite) TestExtractGopsProfileData(c *check.C) {
gopsOutput := `
Profiling CPU now, will take 30 secs...
Profile dump saved to: /tmp/cpu_profile3302111893
`
wantFilepath := "/tmp/cpu_profile3302111893"

gotFilepath, err := extractGopsProfileData(gopsOutput)
c.Assert(err, check.IsNil)
c.Assert(gotFilepath, check.Equals, wantFilepath)

}

func TestKVStoreTask(t *testing.T) {
assert := assert.New(t)
client := &fakeClient{
Expand Down