From 2b4e35445c062750786ba5ad977250fdae5296aa Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 14 Dec 2021 16:51:44 -0800 Subject: [PATCH] elastic-agent diagnostics pprof (#28798) (#29429) * Allow -httpprof to bind to sockets/pipes * Enable pprof debug endpoint on socket for agent and beats Force the elastic-agent and all beats that it starts to run the http/pprof listener on a local socket. * Add new Pprof command to control.proto * Add pprof option to diagnostics collect * Fix linting issues * Add diagonstics pprof command allow pprof to collect from agent * Revert debug socket changes * Cleanup timeout handling Change pprof timeouts from 2*pprofDur to 30s+pprofDur. Remove timeouts from the socket requester client as cancellations for long running requests will be handled by the passed ctx. * Fix linting issue add timeout flag Fix linting issues with new command. Add a timeout flag for when pprof info is gathered. Flag will let users specify the command timeout value. This value whould be greater then the pprof-duration as it needs to gather and process pprof data. * Add more command help text. * Add CHANGELOG * move spec collection for routes to fn * add monitoringCfg reference to control server * elastic-agent server only processes pprof requests when enabled * Fix error message fix commands only on elastic-agent * Add pprof fleet.yml, fix nil reference * Change pprof setting name to monitoring.pprof.enabled Chagne the setting in elastic agent from agent.monioring.pprof to agent.monitoring.pprof.enabled so that policy updates (such as the one that occurs when the agent is starting in fleet mode) do not use the default false value if the user has injected the ssetting into fleet.yml (cherry picked from commit f5e0ec4445a7f9ba2860899e06ddaca16a96ceb0) Co-authored-by: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 2 + .../_meta/config/common.p2.yml.tmpl | 2 +- .../_meta/config/common.reference.p2.yml.tmpl | 2 +- .../config/elastic-agent.docker.yml.tmpl | 2 +- x-pack/elastic-agent/control.proto | 42 ++ x-pack/elastic-agent/elastic-agent.docker.yml | 2 +- .../elastic-agent/elastic-agent.reference.yml | 2 +- x-pack/elastic-agent/elastic-agent.yml | 2 +- .../handlers/handler_action_policy_change.go | 7 +- .../pkg/agent/cmd/diagnostics.go | 200 ++++++- x-pack/elastic-agent/pkg/agent/cmd/run.go | 3 +- .../pkg/agent/control/client/client.go | 36 ++ .../pkg/agent/control/proto/control.pb.go | 513 +++++++++++++++--- .../pkg/agent/control/server/server.go | 352 +++++++++--- .../core/monitoring/beats/beats_monitor.go | 6 +- .../pkg/core/monitoring/config/config.go | 10 +- 16 files changed, 1031 insertions(+), 152 deletions(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 9d4a5932ad4..2c342d2dd3d 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -149,6 +149,8 @@ - Add diagnostics command to gather beat metadata. {pull}28265[28265] - Add diagnostics collect command to gather beat metadata, config, policy, and logs and bundle it into an archive. {pull}28461[28461] - Add `KIBANA_FLEET_SERVICE_TOKEN` to Elastic Agent container. {pull}28096[28096] +- Enable pprof endpoints for beats processes. Allow pprof endpoints for elastic-agent if enabled. {pull}28983[28983] +- Add `--pprof` flag to `elastic-agent diagnostics` and an `elastic-agent pprof` command to allow operators to gather pprof data from the agent and beats running under it. {pull}28798[28798] - Allow pprof endpoints for elastic-agent or beats if enabled. {pull}28983[28983] {pull}29155[29155] - Add --fleet-server-es-ca-trusted-fingerprint flag to allow agent/fleet-server to work with elasticsearch clusters using self signed certs. {pull}29128[29128] - Discover changes in Kubernetes nodes metadata as soon as they happen. {pull}23139[23139] diff --git a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl index de16df8ea7f..e8f4c31e8e1 100644 --- a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl @@ -35,7 +35,7 @@ inputs: # metrics: true # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl index 43e48464630..8a3ef077357 100644 --- a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl @@ -109,7 +109,7 @@ inputs: # metrics: false # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl b/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl index 69a80678db8..17201aa6dce 100644 --- a/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl @@ -109,7 +109,7 @@ inputs: # metrics: false # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/control.proto b/x-pack/elastic-agent/control.proto index 26b6552c395..53168f872ba 100644 --- a/x-pack/elastic-agent/control.proto +++ b/x-pack/elastic-agent/control.proto @@ -29,6 +29,19 @@ enum ActionStatus { FAILURE = 1; } +// pprof endpoint that can be requested. +enum PprofOption { + ALLOCS = 0; + BLOCK = 1; + CMDLINE = 2; + GOROUTINE = 3; + HEAP = 4; + MUTEX = 5; + PROFILE = 6; + THREADCREATE = 7; + TRACE = 8; +} + // Empty message. message Empty { } @@ -128,6 +141,32 @@ message ProcMetaResponse { repeated ProcMeta procs = 1; } +// PprofRequest is a request for pprof data from and http/pprof endpoint. +message PprofRequest { + // The profiles that are requested + repeated PprofOption pprofType = 1; + // A string representing a time.Duration to apply to trace, and profile options. + string traceDuration = 2; + // The application that will be profiled, if empty all applications are profiled. + string appName = 3; + // The route key to match for profiling, if empty all are profiled. + string routeKey = 4; +} + +// PprofResult is the result of a pprof request for a given application/route key. +message PprofResult { + string appName = 1; + string routeKey = 2; + PprofOption pprofType = 3; + bytes result = 4; + string error = 5; +} + +// PprofResponse is a wrapper to return all pprof responses. +message PprofResponse { + repeated PprofResult results = 1; +} + service ElasticAgentControl { // Fetches the currently running version of the Elastic Agent. rpc Version(Empty) returns (VersionResponse); @@ -143,4 +182,7 @@ service ElasticAgentControl { // Gather all running process metadata. rpc ProcMeta(Empty) returns (ProcMetaResponse); + + // Gather requested pprof data from specified applications. + rpc Pprof(PprofRequest) returns (PprofResponse); } diff --git a/x-pack/elastic-agent/elastic-agent.docker.yml b/x-pack/elastic-agent/elastic-agent.docker.yml index 9bf7307aacf..b7d5ff2017e 100644 --- a/x-pack/elastic-agent/elastic-agent.docker.yml +++ b/x-pack/elastic-agent/elastic-agent.docker.yml @@ -109,7 +109,7 @@ inputs: # metrics: false # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/elastic-agent.reference.yml b/x-pack/elastic-agent/elastic-agent.reference.yml index 67922a1d89c..da04df95ea8 100644 --- a/x-pack/elastic-agent/elastic-agent.reference.yml +++ b/x-pack/elastic-agent/elastic-agent.reference.yml @@ -115,7 +115,7 @@ inputs: # metrics: false # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/elastic-agent.yml b/x-pack/elastic-agent/elastic-agent.yml index d40b6518e8d..802df992ba7 100644 --- a/x-pack/elastic-agent/elastic-agent.yml +++ b/x-pack/elastic-agent/elastic-agent.yml @@ -41,7 +41,7 @@ inputs: # metrics: true # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/pkg/agent/application/pipeline/actions/handlers/handler_action_policy_change.go b/x-pack/elastic-agent/pkg/agent/application/pipeline/actions/handlers/handler_action_policy_change.go index f14ec7aea81..e00ccfc844b 100644 --- a/x-pack/elastic-agent/pkg/agent/application/pipeline/actions/handlers/handler_action_policy_change.go +++ b/x-pack/elastic-agent/pkg/agent/application/pipeline/actions/handlers/handler_action_policy_change.go @@ -197,9 +197,10 @@ func fleetToReader(agentInfo *info.AgentInfo, cfg *configuration.Configuration) configToStore := map[string]interface{}{ "fleet": cfg.Fleet, "agent": map[string]interface{}{ - "id": agentInfo.AgentID(), - "logging.level": cfg.Settings.LoggingConfig.Level, - "monitoring.http": cfg.Settings.MonitoringConfig.HTTP, + "id": agentInfo.AgentID(), + "logging.level": cfg.Settings.LoggingConfig.Level, + "monitoring.http": cfg.Settings.MonitoringConfig.HTTP, + "monitoring.pprof": cfg.Settings.MonitoringConfig.Pprof, }, } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go index e90e4ab13c1..b32edf6df2d 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config/operations" @@ -63,6 +64,7 @@ func newDiagnosticsCommand(s []string, streams *cli.IOStreams) *cobra.Command { cmd.Flags().String("output", "human", "Output the diagnostics information in either human, json, or yaml (default: human)") cmd.AddCommand(newDiagnosticsCollectCommandWithArgs(s, streams)) + cmd.AddCommand(newDiagnosticsPprofCommandWithArgs(s, streams)) return cmd } @@ -72,7 +74,7 @@ func newDiagnosticsCollectCommandWithArgs(_ []string, streams *cli.IOStreams) *c Use: "collect", Short: "Collect diagnostics information from the elastic-agent and write it to a zip archive.", Long: "Collect diagnostics information from the elastic-agent and write it to a zip archive.\nNote that any credentials will appear in plain text.", - Args: cobra.MaximumNArgs(1), + Args: cobra.MaximumNArgs(3), RunE: func(c *cobra.Command, args []string) error { file, _ := c.Flags().GetString("file") @@ -89,12 +91,58 @@ func newDiagnosticsCollectCommandWithArgs(_ []string, streams *cli.IOStreams) *c return fmt.Errorf("unsupported output: %s", output) } - return diagnosticsCollectCmd(streams, file, output) + pprof, _ := c.Flags().GetBool("pprof") + d, _ := c.Flags().GetDuration("pprof-duration") + // get the command timeout value only if one is set explicitly. + // otherwise a value of 30s + pprof-duration will be used. + var timeout time.Duration + if c.Flags().Changed("timeout") { + timeout, _ = c.Flags().GetDuration("timeout") + } + + return diagnosticsCollectCmd(streams, file, output, pprof, d, timeout) }, } cmd.Flags().StringP("file", "f", "", "name of the output diagnostics zip archive") cmd.Flags().String("output", "yaml", "Output the collected information in either json, or yaml (default: yaml)") // replace output flag with different options + cmd.Flags().Bool("pprof", false, "Collect all pprof data from all running applications.") + cmd.Flags().Duration("pprof-duration", time.Second*30, "The duration to collect trace and profiling data from the debug/pprof endpoints. (default: 30s)") + cmd.Flags().Duration("timeout", time.Second*30, "The timeout for the diagnostics collect command, will be either 30s or 30s+pprof-duration by default. Should be longer then pprof-duration when pprof is enabled as the command needs time to process/archive the response.") + + return cmd +} + +func newDiagnosticsPprofCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command { + cmd := &cobra.Command{ + Use: "pprof", + Short: "Collect pprof information from a running process.", + Long: "Collect pprof information from the elastic-agent or one of its processes and write to stdout or a file.\nBy default it will gather a 30s profile of the elastic-agent and output on stdout.", + Args: cobra.MaximumNArgs(5), + RunE: func(c *cobra.Command, args []string) error { + file, _ := c.Flags().GetString("file") + pprofType, _ := c.Flags().GetString("pprof-type") + d, _ := c.Flags().GetDuration("pprof-duration") + // get the command timeout value only if one is set explicitly. + // otherwise a value of 30s + pprof-duration will be used. + var timeout time.Duration + if c.Flags().Changed("timeout") { + timeout, _ = c.Flags().GetDuration("timeout") + } + + pprofApp, _ := c.Flags().GetString("pprof-application") + pprofRK, _ := c.Flags().GetString("pprof-route-key") + + return diagnosticsPprofCmd(streams, d, timeout, file, pprofType, pprofApp, pprofRK) + }, + } + + cmd.Flags().StringP("file", "f", "", "name of the output file, stdout if unspecified.") + cmd.Flags().String("pprof-type", "profile", "Collect all pprof data from all running applications. Select one of [allocs, block, cmdline, goroutine, heap, mutex, profile, threadcreate, trace]") + cmd.Flags().Duration("pprof-duration", time.Second*30, "The duration to collect trace and profiling data from the debug/pprof endpoints. (default: 30s)") + cmd.Flags().Duration("timeout", time.Second*60, "The timeout for the pprof collect command, defaults to 30s+pprof-duration by default. Should be longer then pprof-duration as the command needs time to process the response.") + cmd.Flags().String("pprof-application", "elastic-agent", "Application name to collect pprof data from.") + cmd.Flags().String("pprof-route-key", "default", "Route key to collect pprof data from.") return cmd } @@ -127,14 +175,22 @@ func diagnosticCmd(streams *cli.IOStreams, cmd *cobra.Command, args []string) er return outputFunc(streams.Out, diag) } -func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string) error { +func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string, pprof bool, pprofDur, cmdTimeout time.Duration) error { err := tryContainerLoadPaths() if err != nil { return err } ctx := handleSignal(context.Background()) - innerCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + // set command timeout to 30s or 30s+pprofDur if no timeout is specified + if cmdTimeout == time.Duration(0) { + cmdTimeout = time.Second * 30 + if pprof { + cmdTimeout += pprofDur + } + + } + innerCtx, cancel := context.WithTimeout(ctx, cmdTimeout) defer cancel() diag, err := getDiagnostics(innerCtx) @@ -151,7 +207,15 @@ func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string return fmt.Errorf("unable to gather config data: %w", err) } - err = createZip(fileName, outputFormat, diag, cfg) + var pprofData map[string][]client.ProcPProf = nil + if pprof { + pprofData, err = getAllPprof(innerCtx, pprofDur) + if err != nil { + return fmt.Errorf("unable to gather pprof data: %w", err) + } + } + + err = createZip(fileName, outputFormat, diag, cfg, pprofData) if err != nil { return fmt.Errorf("unable to create archive %q: %w", fileName, err) } @@ -160,6 +224,68 @@ func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string return nil } +func diagnosticsPprofCmd(streams *cli.IOStreams, dur, cmdTimeout time.Duration, outFile, pType, appName, rk string) error { + pt, ok := proto.PprofOption_value[strings.ToUpper(pType)] + if !ok { + return fmt.Errorf("unknown pprof-type %q, select one of [allocs, block, cmdline, goroutine, heap, mutex, profile, threadcreate, trace]", pType) + } + + // the elastic-agent application does not have a route key + if appName == "elastic-agent" { + rk = "" + } + + ctx := handleSignal(context.Background()) + // set cmdTimeout to 30s+dur if not set. + if cmdTimeout == time.Duration(0) { + cmdTimeout = time.Second*30 + dur + } + innerCtx, cancel := context.WithTimeout(ctx, cmdTimeout) + defer cancel() + + daemon := client.New() + err := daemon.Connect(ctx) + if err != nil { + return err + } + + pprofData, err := daemon.Pprof(innerCtx, dur, []proto.PprofOption{proto.PprofOption(pt)}, appName, rk) + if err != nil { + return err + } + + // validate response + pArr, ok := pprofData[proto.PprofOption_name[pt]] + if !ok { + return fmt.Errorf("route key %q not found in response data (map length: %d)", rk, len(pprofData)) + } + if len(pArr) != 1 { + return fmt.Errorf("pprof type length 1 expected, recieved %d", len(pArr)) + } + res := pArr[0] + + if res.Error != "" { + return fmt.Errorf(res.Error) + } + + // handle result + if outFile != "" { + f, err := os.Create(outFile) + if err != nil { + return err + } + defer f.Close() + _, err = f.Write(res.Result) + if err != nil { + return err + } + fmt.Fprintf(streams.Out, "pprof data written to %s\n", outFile) + return nil + } + _, err = streams.Out.Write(res.Result) + return err +} + func getDiagnostics(ctx context.Context) (DiagnosticsInfo, error) { daemon := client.New() diag := DiagnosticsInfo{} @@ -242,7 +368,7 @@ func gatherConfig() (AgentConfig, error) { // // The passed DiagnosticsInfo and AgentConfig data is written in the specified output format. // Any local log files are collected and copied into the archive. -func createZip(fileName, outputFormat string, diag DiagnosticsInfo, cfg AgentConfig) error { +func createZip(fileName, outputFormat string, diag DiagnosticsInfo, cfg AgentConfig, pprof map[string][]client.ProcPProf) error { f, err := os.Create(fileName) if err != nil { return err @@ -298,6 +424,13 @@ func createZip(fileName, outputFormat string, diag DiagnosticsInfo, cfg AgentCon return closeHandlers(err, zw, f) } + if pprof != nil { + err := zipProfs(zw, pprof) + if err != nil { + return closeHandlers(err, zw, f) + } + } + return closeHandlers(nil, zw, f) } @@ -371,3 +504,58 @@ func closeHandlers(err error, closers ...io.Closer) error { } return mErr.ErrorOrNil() } + +func getAllPprof(ctx context.Context, d time.Duration) (map[string][]client.ProcPProf, error) { + daemon := client.New() + err := daemon.Connect(ctx) + if err != nil { + return nil, err + } + pprofTypes := []proto.PprofOption{ + proto.PprofOption_ALLOCS, + proto.PprofOption_BLOCK, + proto.PprofOption_CMDLINE, + proto.PprofOption_GOROUTINE, + proto.PprofOption_HEAP, + proto.PprofOption_MUTEX, + proto.PprofOption_PROFILE, + proto.PprofOption_THREADCREATE, + proto.PprofOption_TRACE, + } + return daemon.Pprof(ctx, d, pprofTypes, "", "") +} + +func zipProfs(zw *zip.Writer, pprof map[string][]client.ProcPProf) error { + zf, err := zw.Create("pprof/") + if err != nil { + return err + } + for pType, profs := range pprof { + zf, err = zw.Create("pprof/" + pType + "/") + if err != nil { + return err + } + for _, p := range profs { + if p.Error != "" { + zf, err = zw.Create("pprof/" + pType + "/" + p.Name + "_" + p.RouteKey + "_error.txt") + if err != nil { + return err + } + _, err = zf.Write([]byte(p.Error)) + if err != nil { + return err + } + continue + } + zf, err = zw.Create("pprof/" + pType + "/" + p.Name + "_" + p.RouteKey + ".pprof") + if err != nil { + return err + } + _, err = zf.Write(p.Result) + if err != nil { + return err + } + } + } + return nil +} diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index 1c8c1dd4916..4ea0a7bcac7 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -148,6 +148,7 @@ func run(streams *cli.IOStreams, override cfgOverrider) error { } control.SetRouteFn(app.Routes) + control.SetMonitoringCfg(cfg.Settings.MonitoringConfig) serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app) if err != nil { @@ -313,7 +314,7 @@ func setupMetrics(agentInfo *info.AgentInfo, logger *logger.Logger, operatingSys } s.Start() - if cfg.Pprof { + if cfg.Pprof != nil && cfg.Pprof.Enabled { s.AttachPprof() } diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index f7332b9896e..900718bc6a6 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -72,6 +72,14 @@ type ProcMeta struct { Error string } +// ProcPProf returns pprof data for a process. +type ProcPProf struct { + Name string + RouteKey string + Result []byte + Error string +} + // AgentStatus is the current status of the Elastic Agent. type AgentStatus struct { Status Status @@ -95,6 +103,8 @@ type Client interface { Upgrade(ctx context.Context, version string, sourceURI string) (string, error) // ProcMeta gathers running process meta-data. ProcMeta(ctx context.Context) ([]ProcMeta, error) + // Pprof gathers data from the /debug/pprof/ endpoints specified. + Pprof(ctx context.Context, d time.Duration, pprofTypes []proto.PprofOption, appName, routeKey string) (map[string][]ProcPProf, error) } // client manages the state and communication to the Elastic Agent. @@ -247,3 +257,29 @@ func (c *client) ProcMeta(ctx context.Context) ([]ProcMeta, error) { } return procMeta, nil } + +// Pprof gathers /debug/pprof data and returns a map of pprof-type: ProcPProf data +func (c *client) Pprof(ctx context.Context, d time.Duration, pprofTypes []proto.PprofOption, appName, routeKey string) (map[string][]ProcPProf, error) { + resp, err := c.client.Pprof(ctx, &proto.PprofRequest{ + PprofType: pprofTypes, + TraceDuration: d.String(), + AppName: appName, + RouteKey: routeKey, + }) + if err != nil { + return nil, err + } + res := map[string][]ProcPProf{} + for _, pType := range pprofTypes { + res[pType.String()] = make([]ProcPProf, 0) + } + for _, r := range resp.Results { + res[r.PprofType.String()] = append(res[r.PprofType.String()], ProcPProf{ + Name: r.AppName, + RouteKey: r.RouteKey, + Result: r.Result, + Error: r.Error, + }) + } + return res, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go index 2cdec52cf65..70c66acd4ab 100644 --- a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go +++ b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go @@ -143,6 +143,74 @@ func (ActionStatus) EnumDescriptor() ([]byte, []int) { return file_control_proto_rawDescGZIP(), []int{1} } +// pprof endpoint that can be requested. +type PprofOption int32 + +const ( + PprofOption_ALLOCS PprofOption = 0 + PprofOption_BLOCK PprofOption = 1 + PprofOption_CMDLINE PprofOption = 2 + PprofOption_GOROUTINE PprofOption = 3 + PprofOption_HEAP PprofOption = 4 + PprofOption_MUTEX PprofOption = 5 + PprofOption_PROFILE PprofOption = 6 + PprofOption_THREADCREATE PprofOption = 7 + PprofOption_TRACE PprofOption = 8 +) + +// Enum value maps for PprofOption. +var ( + PprofOption_name = map[int32]string{ + 0: "ALLOCS", + 1: "BLOCK", + 2: "CMDLINE", + 3: "GOROUTINE", + 4: "HEAP", + 5: "MUTEX", + 6: "PROFILE", + 7: "THREADCREATE", + 8: "TRACE", + } + PprofOption_value = map[string]int32{ + "ALLOCS": 0, + "BLOCK": 1, + "CMDLINE": 2, + "GOROUTINE": 3, + "HEAP": 4, + "MUTEX": 5, + "PROFILE": 6, + "THREADCREATE": 7, + "TRACE": 8, + } +) + +func (x PprofOption) Enum() *PprofOption { + p := new(PprofOption) + *p = x + return p +} + +func (x PprofOption) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (PprofOption) Descriptor() protoreflect.EnumDescriptor { + return file_control_proto_enumTypes[2].Descriptor() +} + +func (PprofOption) Type() protoreflect.EnumType { + return &file_control_proto_enumTypes[2] +} + +func (x PprofOption) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use PprofOption.Descriptor instead. +func (PprofOption) EnumDescriptor() ([]byte, []int) { + return file_control_proto_rawDescGZIP(), []int{2} +} + // Empty message. type Empty struct { state protoimpl.MessageState @@ -807,6 +875,210 @@ func (x *ProcMetaResponse) GetProcs() []*ProcMeta { return nil } +// PprofRequest is a request for pprof data from and http/pprof endpoint. +type PprofRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The profiles that are requested + PprofType []PprofOption `protobuf:"varint,1,rep,packed,name=pprofType,proto3,enum=proto.PprofOption" json:"pprofType,omitempty"` + // A string representing a time.Duration to apply to trace, and profile options. + TraceDuration string `protobuf:"bytes,2,opt,name=traceDuration,proto3" json:"traceDuration,omitempty"` + // The application that will be profiled, if empty all applications are profiled. + AppName string `protobuf:"bytes,3,opt,name=appName,proto3" json:"appName,omitempty"` + // The route key to match for profiling, if empty all are profiled. + RouteKey string `protobuf:"bytes,4,opt,name=routeKey,proto3" json:"routeKey,omitempty"` +} + +func (x *PprofRequest) Reset() { + *x = PprofRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_control_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PprofRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PprofRequest) ProtoMessage() {} + +func (x *PprofRequest) ProtoReflect() protoreflect.Message { + mi := &file_control_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PprofRequest.ProtoReflect.Descriptor instead. +func (*PprofRequest) Descriptor() ([]byte, []int) { + return file_control_proto_rawDescGZIP(), []int{9} +} + +func (x *PprofRequest) GetPprofType() []PprofOption { + if x != nil { + return x.PprofType + } + return nil +} + +func (x *PprofRequest) GetTraceDuration() string { + if x != nil { + return x.TraceDuration + } + return "" +} + +func (x *PprofRequest) GetAppName() string { + if x != nil { + return x.AppName + } + return "" +} + +func (x *PprofRequest) GetRouteKey() string { + if x != nil { + return x.RouteKey + } + return "" +} + +// PprofResult is the result of a pprof request for a given application/route key. +type PprofResult struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AppName string `protobuf:"bytes,1,opt,name=appName,proto3" json:"appName,omitempty"` + RouteKey string `protobuf:"bytes,2,opt,name=routeKey,proto3" json:"routeKey,omitempty"` + PprofType PprofOption `protobuf:"varint,3,opt,name=pprofType,proto3,enum=proto.PprofOption" json:"pprofType,omitempty"` + Result []byte `protobuf:"bytes,4,opt,name=result,proto3" json:"result,omitempty"` + Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"` +} + +func (x *PprofResult) Reset() { + *x = PprofResult{} + if protoimpl.UnsafeEnabled { + mi := &file_control_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PprofResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PprofResult) ProtoMessage() {} + +func (x *PprofResult) ProtoReflect() protoreflect.Message { + mi := &file_control_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PprofResult.ProtoReflect.Descriptor instead. +func (*PprofResult) Descriptor() ([]byte, []int) { + return file_control_proto_rawDescGZIP(), []int{10} +} + +func (x *PprofResult) GetAppName() string { + if x != nil { + return x.AppName + } + return "" +} + +func (x *PprofResult) GetRouteKey() string { + if x != nil { + return x.RouteKey + } + return "" +} + +func (x *PprofResult) GetPprofType() PprofOption { + if x != nil { + return x.PprofType + } + return PprofOption_ALLOCS +} + +func (x *PprofResult) GetResult() []byte { + if x != nil { + return x.Result + } + return nil +} + +func (x *PprofResult) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +// PprofResponse is a wrapper to return all pprof responses. +type PprofResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Results []*PprofResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` +} + +func (x *PprofResponse) Reset() { + *x = PprofResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_control_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PprofResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PprofResponse) ProtoMessage() {} + +func (x *PprofResponse) ProtoReflect() protoreflect.Message { + mi := &file_control_proto_msgTypes[11] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PprofResponse.ProtoReflect.Descriptor instead. +func (*PprofResponse) Descriptor() ([]byte, []int) { + return file_control_proto_rawDescGZIP(), []int{11} +} + +func (x *PprofResponse) GetResults() []*PprofResult { + if x != nil { + return x.Results + } + return nil +} + var File_control_proto protoreflect.FileDescriptor var file_control_proto_rawDesc = []byte{ @@ -886,37 +1158,73 @@ var file_control_proto_rawDesc = []byte{ 0x10, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, - 0x61, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x63, 0x73, 0x2a, 0x79, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x54, 0x41, 0x52, 0x54, 0x49, 0x4e, 0x47, 0x10, 0x00, - 0x12, 0x0f, 0x0a, 0x0b, 0x43, 0x4f, 0x4e, 0x46, 0x49, 0x47, 0x55, 0x52, 0x49, 0x4e, 0x47, 0x10, - 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x59, 0x10, 0x02, 0x12, 0x0c, - 0x0a, 0x08, 0x44, 0x45, 0x47, 0x52, 0x41, 0x44, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, - 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x54, 0x4f, 0x50, - 0x50, 0x49, 0x4e, 0x47, 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x55, 0x50, 0x47, 0x52, 0x41, 0x44, - 0x49, 0x4e, 0x47, 0x10, 0x06, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x4f, 0x4c, 0x4c, 0x42, 0x41, 0x43, - 0x4b, 0x10, 0x07, 0x2a, 0x28, 0x0a, 0x0c, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x00, - 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x32, 0x93, 0x02, - 0x0a, 0x13, 0x45, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, - 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x07, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, - 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, - 0x65, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x31, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x0c, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, 0x1d, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, - 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0xf8, 0x01, 0x01, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x63, 0x73, 0x22, 0x9c, 0x01, 0x0a, 0x0c, 0x50, 0x70, 0x72, + 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x30, 0x0a, 0x09, 0x70, 0x70, 0x72, + 0x6f, 0x66, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x54, 0x79, 0x70, 0x65, 0x12, 0x24, 0x0a, 0x0d, 0x74, + 0x72, 0x61, 0x63, 0x65, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0d, 0x74, 0x72, 0x61, 0x63, 0x65, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x70, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x61, 0x70, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x22, 0xa3, 0x01, 0x0a, 0x0b, 0x50, 0x70, 0x72, 0x6f, + 0x66, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x70, 0x70, 0x4e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x70, 0x70, 0x4e, 0x61, 0x6d, + 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, + 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x54, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, + 0x32, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x54, 0x79, 0x70, 0x65, 0x12, + 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x3d, 0x0a, + 0x0d, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, + 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, + 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x2a, 0x79, 0x0a, 0x06, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x54, 0x41, 0x52, 0x54, 0x49, + 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x43, 0x4f, 0x4e, 0x46, 0x49, 0x47, 0x55, 0x52, + 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x59, + 0x10, 0x02, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x45, 0x47, 0x52, 0x41, 0x44, 0x45, 0x44, 0x10, 0x03, + 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, + 0x53, 0x54, 0x4f, 0x50, 0x50, 0x49, 0x4e, 0x47, 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x55, 0x50, + 0x47, 0x52, 0x41, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x4f, 0x4c, + 0x4c, 0x42, 0x41, 0x43, 0x4b, 0x10, 0x07, 0x2a, 0x28, 0x0a, 0x0c, 0x41, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, + 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, + 0x01, 0x2a, 0x7f, 0x0a, 0x0b, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4c, 0x4c, 0x4f, 0x43, 0x53, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, + 0x42, 0x4c, 0x4f, 0x43, 0x4b, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x4d, 0x44, 0x4c, 0x49, + 0x4e, 0x45, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x47, 0x4f, 0x52, 0x4f, 0x55, 0x54, 0x49, 0x4e, + 0x45, 0x10, 0x03, 0x12, 0x08, 0x0a, 0x04, 0x48, 0x45, 0x41, 0x50, 0x10, 0x04, 0x12, 0x09, 0x0a, + 0x05, 0x4d, 0x55, 0x54, 0x45, 0x58, 0x10, 0x05, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x52, 0x4f, 0x46, + 0x49, 0x4c, 0x45, 0x10, 0x06, 0x12, 0x10, 0x0a, 0x0c, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x43, + 0x52, 0x45, 0x41, 0x54, 0x45, 0x10, 0x07, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x52, 0x41, 0x43, 0x45, + 0x10, 0x08, 0x32, 0xc7, 0x02, 0x0a, 0x13, 0x45, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x41, 0x67, + 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x07, 0x56, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x06, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x52, 0x65, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, 0x55, + 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, + 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, + 0x61, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, + 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, 0x61, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x32, 0x0a, 0x05, 0x50, 0x70, 0x72, 0x6f, + 0x66, 0x12, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, + 0x70, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, 0x1d, + 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, + 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0xf8, 0x01, 0x01, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -931,43 +1239,52 @@ func file_control_proto_rawDescGZIP() []byte { return file_control_proto_rawDescData } -var file_control_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_control_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_control_proto_enumTypes = make([]protoimpl.EnumInfo, 3) +var file_control_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_control_proto_goTypes = []interface{}{ (Status)(0), // 0: proto.Status (ActionStatus)(0), // 1: proto.ActionStatus - (*Empty)(nil), // 2: proto.Empty - (*VersionResponse)(nil), // 3: proto.VersionResponse - (*RestartResponse)(nil), // 4: proto.RestartResponse - (*UpgradeRequest)(nil), // 5: proto.UpgradeRequest - (*UpgradeResponse)(nil), // 6: proto.UpgradeResponse - (*ApplicationStatus)(nil), // 7: proto.ApplicationStatus - (*ProcMeta)(nil), // 8: proto.ProcMeta - (*StatusResponse)(nil), // 9: proto.StatusResponse - (*ProcMetaResponse)(nil), // 10: proto.ProcMetaResponse + (PprofOption)(0), // 2: proto.PprofOption + (*Empty)(nil), // 3: proto.Empty + (*VersionResponse)(nil), // 4: proto.VersionResponse + (*RestartResponse)(nil), // 5: proto.RestartResponse + (*UpgradeRequest)(nil), // 6: proto.UpgradeRequest + (*UpgradeResponse)(nil), // 7: proto.UpgradeResponse + (*ApplicationStatus)(nil), // 8: proto.ApplicationStatus + (*ProcMeta)(nil), // 9: proto.ProcMeta + (*StatusResponse)(nil), // 10: proto.StatusResponse + (*ProcMetaResponse)(nil), // 11: proto.ProcMetaResponse + (*PprofRequest)(nil), // 12: proto.PprofRequest + (*PprofResult)(nil), // 13: proto.PprofResult + (*PprofResponse)(nil), // 14: proto.PprofResponse } var file_control_proto_depIdxs = []int32{ 1, // 0: proto.RestartResponse.status:type_name -> proto.ActionStatus 1, // 1: proto.UpgradeResponse.status:type_name -> proto.ActionStatus 0, // 2: proto.ApplicationStatus.status:type_name -> proto.Status 0, // 3: proto.StatusResponse.status:type_name -> proto.Status - 7, // 4: proto.StatusResponse.applications:type_name -> proto.ApplicationStatus - 8, // 5: proto.ProcMetaResponse.procs:type_name -> proto.ProcMeta - 2, // 6: proto.ElasticAgentControl.Version:input_type -> proto.Empty - 2, // 7: proto.ElasticAgentControl.Status:input_type -> proto.Empty - 2, // 8: proto.ElasticAgentControl.Restart:input_type -> proto.Empty - 5, // 9: proto.ElasticAgentControl.Upgrade:input_type -> proto.UpgradeRequest - 2, // 10: proto.ElasticAgentControl.ProcMeta:input_type -> proto.Empty - 3, // 11: proto.ElasticAgentControl.Version:output_type -> proto.VersionResponse - 9, // 12: proto.ElasticAgentControl.Status:output_type -> proto.StatusResponse - 4, // 13: proto.ElasticAgentControl.Restart:output_type -> proto.RestartResponse - 6, // 14: proto.ElasticAgentControl.Upgrade:output_type -> proto.UpgradeResponse - 10, // 15: proto.ElasticAgentControl.ProcMeta:output_type -> proto.ProcMetaResponse - 11, // [11:16] is the sub-list for method output_type - 6, // [6:11] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 8, // 4: proto.StatusResponse.applications:type_name -> proto.ApplicationStatus + 9, // 5: proto.ProcMetaResponse.procs:type_name -> proto.ProcMeta + 2, // 6: proto.PprofRequest.pprofType:type_name -> proto.PprofOption + 2, // 7: proto.PprofResult.pprofType:type_name -> proto.PprofOption + 13, // 8: proto.PprofResponse.results:type_name -> proto.PprofResult + 3, // 9: proto.ElasticAgentControl.Version:input_type -> proto.Empty + 3, // 10: proto.ElasticAgentControl.Status:input_type -> proto.Empty + 3, // 11: proto.ElasticAgentControl.Restart:input_type -> proto.Empty + 6, // 12: proto.ElasticAgentControl.Upgrade:input_type -> proto.UpgradeRequest + 3, // 13: proto.ElasticAgentControl.ProcMeta:input_type -> proto.Empty + 12, // 14: proto.ElasticAgentControl.Pprof:input_type -> proto.PprofRequest + 4, // 15: proto.ElasticAgentControl.Version:output_type -> proto.VersionResponse + 10, // 16: proto.ElasticAgentControl.Status:output_type -> proto.StatusResponse + 5, // 17: proto.ElasticAgentControl.Restart:output_type -> proto.RestartResponse + 7, // 18: proto.ElasticAgentControl.Upgrade:output_type -> proto.UpgradeResponse + 11, // 19: proto.ElasticAgentControl.ProcMeta:output_type -> proto.ProcMetaResponse + 14, // 20: proto.ElasticAgentControl.Pprof:output_type -> proto.PprofResponse + 15, // [15:21] is the sub-list for method output_type + 9, // [9:15] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_control_proto_init() } @@ -1084,14 +1401,50 @@ func file_control_proto_init() { return nil } } + file_control_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PprofRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_control_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PprofResult); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_control_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PprofResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_control_proto_rawDesc, - NumEnums: 2, - NumMessages: 9, + NumEnums: 3, + NumMessages: 12, NumExtensions: 0, NumServices: 1, }, @@ -1128,6 +1481,8 @@ type ElasticAgentControlClient interface { Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (*UpgradeResponse, error) // Gather all running process metadata. ProcMeta(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*ProcMetaResponse, error) + // Gather requested pprof data from specified applications. + Pprof(ctx context.Context, in *PprofRequest, opts ...grpc.CallOption) (*PprofResponse, error) } type elasticAgentControlClient struct { @@ -1183,6 +1538,15 @@ func (c *elasticAgentControlClient) ProcMeta(ctx context.Context, in *Empty, opt return out, nil } +func (c *elasticAgentControlClient) Pprof(ctx context.Context, in *PprofRequest, opts ...grpc.CallOption) (*PprofResponse, error) { + out := new(PprofResponse) + err := c.cc.Invoke(ctx, "/proto.ElasticAgentControl/Pprof", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ElasticAgentControlServer is the server API for ElasticAgentControl service. type ElasticAgentControlServer interface { // Fetches the currently running version of the Elastic Agent. @@ -1195,6 +1559,8 @@ type ElasticAgentControlServer interface { Upgrade(context.Context, *UpgradeRequest) (*UpgradeResponse, error) // Gather all running process metadata. ProcMeta(context.Context, *Empty) (*ProcMetaResponse, error) + // Gather requested pprof data from specified applications. + Pprof(context.Context, *PprofRequest) (*PprofResponse, error) } // UnimplementedElasticAgentControlServer can be embedded to have forward compatible implementations. @@ -1216,6 +1582,9 @@ func (*UnimplementedElasticAgentControlServer) Upgrade(context.Context, *Upgrade func (*UnimplementedElasticAgentControlServer) ProcMeta(context.Context, *Empty) (*ProcMetaResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ProcMeta not implemented") } +func (*UnimplementedElasticAgentControlServer) Pprof(context.Context, *PprofRequest) (*PprofResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Pprof not implemented") +} func RegisterElasticAgentControlServer(s *grpc.Server, srv ElasticAgentControlServer) { s.RegisterService(&_ElasticAgentControl_serviceDesc, srv) @@ -1311,6 +1680,24 @@ func _ElasticAgentControl_ProcMeta_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _ElasticAgentControl_Pprof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PprofRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ElasticAgentControlServer).Pprof(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.ElasticAgentControl/Pprof", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ElasticAgentControlServer).Pprof(ctx, req.(*PprofRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _ElasticAgentControl_serviceDesc = grpc.ServiceDesc{ ServiceName: "proto.ElasticAgentControl", HandlerType: (*ElasticAgentControlServer)(nil), @@ -1335,6 +1722,10 @@ var _ElasticAgentControl_serviceDesc = grpc.ServiceDesc{ MethodName: "ProcMeta", Handler: _ElasticAgentControl_ProcMeta_Handler, }, + { + MethodName: "Pprof", + Handler: _ElasticAgentControl_Pprof_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "control.proto", diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 072b212a771..12ed4650eed 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -7,6 +7,8 @@ package server import ( "context" "encoding/json" + "fmt" + "io" "net" "net/http" "runtime" @@ -23,7 +25,9 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats" monitoring "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats" + monitoringCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/socket" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" @@ -33,20 +37,27 @@ import ( // Server is the daemon side of the control protocol. type Server struct { - logger *logger.Logger - rex reexec.ExecManager - statusCtrl status.Controller - up *upgrade.Upgrader - routeFn func() *sorted.Set - listener net.Listener - server *grpc.Server - lock sync.RWMutex + logger *logger.Logger + rex reexec.ExecManager + statusCtrl status.Controller + up *upgrade.Upgrader + routeFn func() *sorted.Set + monitoringCfg *monitoringCfg.MonitoringConfig + listener net.Listener + server *grpc.Server + lock sync.RWMutex } type specer interface { Specs() map[string]program.Spec } +type specInfo struct { + spec program.Spec + app string + rk string +} + // New creates a new control protocol server. func New(log *logger.Logger, rex reexec.ExecManager, statusCtrl status.Controller, up *upgrade.Upgrader) *Server { return &Server{ @@ -71,6 +82,14 @@ func (s *Server) SetRouteFn(routesFetchFn func() *sorted.Set) { s.routeFn = routesFetchFn } +// SetMonitoringCfg sets a reference to the monitoring config used by the running agent. +// the controller references this config to find out if pprof is enabled for the agent or not +func (s *Server) SetMonitoringCfg(cfg *monitoringCfg.MonitoringConfig) { + s.lock.Lock() + defer s.lock.Unlock() + s.monitoringCfg = cfg +} + // Start starts the GRPC endpoint and accepts new connections. func (s *Server) Start() error { if s.server != nil { @@ -194,84 +213,273 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR Procs: []*proto.ProcMeta{}, } + // gather spec data for all rk/apps running + specs := s.getSpecInfo("", "") + for _, si := range specs { + endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk) + client := newSocketRequester(si.app, si.rk, endpoint) + + procMeta := client.procMeta(ctx) + resp.Procs = append(resp.Procs, procMeta) + } + + return resp, nil +} + +// Pprof returns /debug/pprof data for the requested applicaiont-route_key or all running applications. +func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.PprofResponse, error) { + if s.monitoringCfg == nil || s.monitoringCfg.Pprof == nil || !s.monitoringCfg.Pprof.Enabled { + return nil, fmt.Errorf("agent.monitoring.pprof disabled") + } + + if s.routeFn == nil { + return nil, errors.New("route function is nil") + } + + dur, err := time.ParseDuration(req.TraceDuration) + if err != nil { + return nil, fmt.Errorf("unable to parse trace duration: %w", err) + } + + resp := &proto.PprofResponse{ + Results: []*proto.PprofResult{}, + } + + var wg sync.WaitGroup + ch := make(chan *proto.PprofResult, 1) + + // retrieve elastic-agent pprof data if requested or application is unspecified. + if req.AppName == "" || req.AppName == "elastic-agent" { + endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP) + c := newSocketRequester("elastic-agent", "", endpoint) + for _, opt := range req.PprofType { + wg.Add(1) + go func(opt proto.PprofOption) { + res := c.getPprof(ctx, opt, dur) + ch <- res + wg.Done() + }(opt) + } + } + + // get requested rk/appname spec or all specs + var specs []specInfo + if req.AppName != "elastic-agent" { + specs = s.getSpecInfo(req.RouteKey, req.AppName) + } + for _, si := range specs { + endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk) + c := newSocketRequester(si.app, si.rk, endpoint) + // Launch a concurrent goroutine to gather all pprof endpoints from a socket. + for _, opt := range req.PprofType { + wg.Add(1) + go func(opt proto.PprofOption) { + res := c.getPprof(ctx, opt, dur) + ch <- res + wg.Done() + }(opt) + } + } + + // wait for the waitgroup to be done and close the channel + go func() { + wg.Wait() + close(ch) + }() + + // gather all results from channel until closed. + for res := range ch { + resp.Results = append(resp.Results, res) + } + return resp, nil +} + +// getSpecs will return the specs for the program associated with the specified route key/app name, or all programs if no key(s) are specified. +// if matchRK or matchApp are empty all results will be returned. +func (s *Server) getSpecInfo(matchRK, matchApp string) []specInfo { routes := s.routeFn() + + // find specInfo for a specified rk/app + if matchRK != "" && matchApp != "" { + programs, ok := routes.Get(matchRK) + if !ok { + s.logger.With("route_key", matchRK).Debug("No matching route key found.") + return []specInfo{} + } + sp, ok := programs.(specer) + if !ok { + s.logger.With("route_key", matchRK, "route", programs).Warn("Unable to cast route as specer.") + return []specInfo{} + } + specs := sp.Specs() + + spec, ok := specs[matchApp] + if !ok { + s.logger.With("route_key", matchRK, "application_name", matchApp).Debug("No matching route key/application name found.") + return []specInfo{} + } + return []specInfo{specInfo{spec: spec, app: matchApp, rk: matchRK}} + } + + // gather specInfo for all rk/app values + res := make([]specInfo, 0) for _, rk := range routes.Keys() { programs, ok := routes.Get(rk) if !ok { + // we do not expect to ever hit this code path + // if this log message occurs then the agent is unable to access one of the keys that is returned by the route function + // might be a race condition if someone tries to update the policy to remove an output? s.logger.With("route_key", rk).Warn("Unable to retrieve route.") continue } - sp, ok := programs.(specer) if !ok { - s.logger.With("route_key", rk, "route", programs).Warn("Unable to cast route as specer.") + s.logger.With("route_key", matchRK, "route", programs).Warn("Unable to cast route as specer.") continue } - specs := sp.Specs() + for n, spec := range sp.Specs() { + res = append(res, specInfo{ + rk: rk, + app: n, + spec: spec, + }) + } + } + return res +} + +// socketRequester is a struct to gather (diagnostics) data from a socket opened by elastic-agent or one if it's processes +type socketRequester struct { + c http.Client + endpoint string + appName string + routeKey string +} - for n, spec := range specs { - procMeta := &proto.ProcMeta{ - Name: n, - RouteKey: rk, - } - - client := http.Client{ - Timeout: time.Second * 5, - } - endpoint := monitoring.MonitoringEndpoint(spec, runtime.GOOS, rk) - if strings.HasPrefix(endpoint, "unix://") { - client.Transport = &http.Transport{ - Proxy: nil, - DialContext: socket.DialContext(strings.TrimPrefix(endpoint, "unix://")), - } - endpoint = "unix" - } else if strings.HasPrefix(endpoint, "npipe://") { - client.Transport = &http.Transport{ - Proxy: nil, - DialContext: socket.DialContext(strings.TrimPrefix(endpoint, "npipe:///")), - } - endpoint = "npipe" - } - - res, err := client.Get("http://" + endpoint + "/") - if err != nil { - procMeta.Error = err.Error() - resp.Procs = append(resp.Procs, procMeta) - continue - } - if res.StatusCode != 200 { - procMeta.Error = "response status is: " + res.Status - resp.Procs = append(resp.Procs, procMeta) - continue - } - - bi := &BeatInfo{} - dec := json.NewDecoder(res.Body) - if err := dec.Decode(bi); err != nil { - res.Body.Close() - procMeta.Error = err.Error() - resp.Procs = append(resp.Procs, procMeta) - continue - } - res.Body.Close() - - procMeta.Process = bi.Beat - procMeta.Hostname = bi.Hostname - procMeta.Id = bi.ID - procMeta.EphemeralId = bi.EphemeralID - procMeta.Version = bi.Version - procMeta.BuildCommit = bi.Commit - procMeta.BuildTime = bi.Time - procMeta.Username = bi.Username - procMeta.UserId = bi.UserID - procMeta.UserGid = bi.GroupID - procMeta.Architecture = bi.BinaryArch - procMeta.ElasticLicensed = bi.ElasticLicensed - - resp.Procs = append(resp.Procs, procMeta) +func newSocketRequester(appName, routeKey, endpoint string) *socketRequester { + c := http.Client{} + if strings.HasPrefix(endpoint, "unix://") { + c.Transport = &http.Transport{ + Proxy: nil, + DialContext: socket.DialContext(strings.TrimPrefix(endpoint, "unix://")), + } + endpoint = "unix" + } else if strings.HasPrefix(endpoint, "npipe://") { + c.Transport = &http.Transport{ + Proxy: nil, + DialContext: socket.DialContext(strings.TrimPrefix(endpoint, "npipe:///")), } + endpoint = "npipe" } - return resp, nil + return &socketRequester{ + c: c, + appName: appName, + routeKey: routeKey, + endpoint: endpoint, + } +} + +// getPath creates a get request for the specified path. +// Will return an error if that status code is not 200. +func (r *socketRequester) getPath(ctx context.Context, path string) (*http.Response, error) { + req, err := http.NewRequest("GET", "http://"+r.endpoint+path, nil) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + res, err := r.c.Do(req) + if err != nil { + return nil, err + } + if res.StatusCode != 200 { + res.Body.Close() + return nil, fmt.Errorf("response status is %d", res.StatusCode) + } + return res, nil + +} + +// procMeta will return process metadata by querying the "/" path. +func (r *socketRequester) procMeta(ctx context.Context) *proto.ProcMeta { + pm := &proto.ProcMeta{ + Name: r.appName, + RouteKey: r.routeKey, + } + + res, err := r.getPath(ctx, "/") + if err != nil { + pm.Error = err.Error() + return pm + } + defer res.Body.Close() + + bi := &BeatInfo{} + dec := json.NewDecoder(res.Body) + if err := dec.Decode(bi); err != nil { + pm.Error = err.Error() + return pm + } + + pm.Process = bi.Beat + pm.Hostname = bi.Hostname + pm.Id = bi.ID + pm.EphemeralId = bi.EphemeralID + pm.Version = bi.Version + pm.BuildCommit = bi.Commit + pm.BuildTime = bi.Time + pm.Username = bi.Username + pm.UserId = bi.UserID + pm.UserGid = bi.GroupID + pm.Architecture = bi.BinaryArch + pm.ElasticLicensed = bi.ElasticLicensed + + return pm +} + +var pprofEndpoints = map[proto.PprofOption]string{ + proto.PprofOption_ALLOCS: "/debug/pprof/allocs", + proto.PprofOption_BLOCK: "/debug/pprof/block", + proto.PprofOption_CMDLINE: "/debug/pprof/cmdline", + proto.PprofOption_GOROUTINE: "/debug/pprof/goroutine", + proto.PprofOption_HEAP: "/debug/pprof/heap", + proto.PprofOption_MUTEX: "/debug/pprof/mutex", + proto.PprofOption_PROFILE: "/debug/pprof/profile", + proto.PprofOption_THREADCREATE: "/debug/pprof/threadcreate", + proto.PprofOption_TRACE: "/debug/pprof/trace", +} + +// getProf will gather pprof data specified by the option. +func (r *socketRequester) getPprof(ctx context.Context, opt proto.PprofOption, dur time.Duration) *proto.PprofResult { + res := &proto.PprofResult{ + AppName: r.appName, + RouteKey: r.routeKey, + PprofType: opt, + } + + path, ok := pprofEndpoints[opt] + if !ok { + res.Error = "unknown path for option" + return res + } + + if opt == proto.PprofOption_PROFILE || opt == proto.PprofOption_TRACE { + path += fmt.Sprintf("?seconds=%0.f", dur.Seconds()) + } + + resp, err := r.getPath(ctx, path) + if err != nil { + res.Error = err.Error() + return res + } + defer resp.Body.Close() + + p, err := io.ReadAll(resp.Body) + if err != nil { + res.Error = err.Error() + return res + } + res.Result = p + return res } type upgradeRequest struct { diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go index cb0519f9806..939aa89c99d 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go @@ -35,6 +35,7 @@ type Monitor struct { func NewMonitor(downloadConfig *artifact.Config, monitoringCfg *monitoringConfig.MonitoringConfig, logMetrics bool) *Monitor { if monitoringCfg == nil { monitoringCfg = monitoringConfig.DefaultConfig() + monitoringCfg.Pprof = &monitoringConfig.PprofConfig{Enabled: false} } monitoringCfg.LogMetrics = logMetrics @@ -55,6 +56,9 @@ func (b *Monitor) Reload(rawConfig *config.Config) error { if cfg == nil || cfg.Settings == nil || cfg.Settings.MonitoringConfig == nil { b.config = monitoringConfig.DefaultConfig() } else { + if cfg.Settings.MonitoringConfig.Pprof == nil { + cfg.Settings.MonitoringConfig.Pprof = b.config.Pprof + } b.config = cfg.Settings.MonitoringConfig logMetrics := true if cfg.Settings.LoggingConfig != nil { @@ -123,7 +127,7 @@ func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string "-E", "http.enabled=true", "-E", "http.host="+endpoint, ) - if b.config.Pprof { + if b.config.Pprof != nil && b.config.Pprof.Enabled { appendix = append(appendix, "-E", "http.pprof.enabled=true", ) diff --git a/x-pack/elastic-agent/pkg/core/monitoring/config/config.go b/x-pack/elastic-agent/pkg/core/monitoring/config/config.go index 10f220fcc5a..3004561bd86 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/config/config.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/config/config.go @@ -15,7 +15,7 @@ type MonitoringConfig struct { LogMetrics bool `yaml:"-" config:"-"` HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"` Namespace string `yaml:"namespace" config:"namespace"` - Pprof bool `yaml:"pprof" config:"pprof"` + Pprof *PprofConfig `yaml:"pprof" config:"pprof"` } // MonitoringHTTPConfig is a config defining HTTP endpoint published by agent @@ -27,6 +27,13 @@ type MonitoringHTTPConfig struct { Port int `yaml:"port" config:"port" validate:"min=0,max=65535,nonzero"` } +// PprofConfig is a struct for the pprof enablement flag. +// It is a nil struct by default to allow the agent to use the a value that the user has injected into fleet.yml as the source of truth that is passed to beats +// TODO get this value from Kibana? +type PprofConfig struct { + Enabled bool `yaml:"enabled" config:"enabled"` +} + // DefaultConfig creates a config with pre-set default values. func DefaultConfig() *MonitoringConfig { return &MonitoringConfig{ @@ -39,6 +46,5 @@ func DefaultConfig() *MonitoringConfig { Port: defaultPort, }, Namespace: defaultNamespace, - Pprof: false, } }