From dd06f1f041cfbacd280d1cc24a7d71103b403675 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 3 Nov 2021 11:04:46 -0700 Subject: [PATCH 01/17] Allow -httpprof to bind to sockets/pipes --- libbeat/api/make_listener_posix.go | 2 +- libbeat/api/make_listener_windows.go | 2 +- libbeat/api/server.go | 2 +- libbeat/docs/command-reference.asciidoc | 5 +++-- libbeat/service/service.go | 4 ++-- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/libbeat/api/make_listener_posix.go b/libbeat/api/make_listener_posix.go index 17b87135fb94..cede02f126fb 100644 --- a/libbeat/api/make_listener_posix.go +++ b/libbeat/api/make_listener_posix.go @@ -30,7 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/api/npipe" ) -func makeListener(cfg Config) (net.Listener, error) { +func MakeListener(cfg Config) (net.Listener, error) { if len(cfg.User) > 0 { return nil, errors.New("specifying a user is not supported under this platform") } diff --git a/libbeat/api/make_listener_windows.go b/libbeat/api/make_listener_windows.go index c23f2ab29e64..5793af84daae 100644 --- a/libbeat/api/make_listener_windows.go +++ b/libbeat/api/make_listener_windows.go @@ -29,7 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/api/npipe" ) -func makeListener(cfg Config) (net.Listener, error) { +func MakeListener(cfg Config) (net.Listener, error) { if len(cfg.User) > 0 && len(cfg.SecurityDescriptor) > 0 { return nil, errors.New("user and security_descriptor are mutually exclusive, define only one of them") } diff --git a/libbeat/api/server.go b/libbeat/api/server.go index 0b7e6d022dc8..bf2287f7a526 100644 --- a/libbeat/api/server.go +++ b/libbeat/api/server.go @@ -49,7 +49,7 @@ func New(log *logp.Logger, mux *http.ServeMux, config *common.Config) (*Server, return nil, err } - l, err := makeListener(cfg) + l, err := MakeListener(cfg) if err != nil { return nil, err } diff --git a/libbeat/docs/command-reference.asciidoc b/libbeat/docs/command-reference.asciidoc index 833f698682d3..85f512d0ba8a 100644 --- a/libbeat/docs/command-reference.asciidoc +++ b/libbeat/docs/command-reference.asciidoc @@ -665,8 +665,9 @@ endif::[] *`-h, --help`*:: Shows help for the `run` command. -*`--httpprof [HOST]:PORT`*:: -Starts an http server for profiling. This option is useful for troubleshooting +*`--httpprof BIND`*:: +Starts an http server for profiling. `BIND` can be specified as a `host:port`, +unix socket, or Windows named pipe. This option is useful for troubleshooting and profiling {beatname_uc}. ifeval::["{beatname_lc}"=="packetbeat"] diff --git a/libbeat/service/service.go b/libbeat/service/service.go index 4c56cfc28a28..7901ebe469a2 100644 --- a/libbeat/service/service.go +++ b/libbeat/service/service.go @@ -22,7 +22,6 @@ import ( "expvar" "flag" "fmt" - "net" "net/http" _ "net/http/pprof" "os" @@ -32,6 +31,7 @@ import ( "sync" "syscall" + "github.com/elastic/beats/v7/libbeat/api" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" ) @@ -119,7 +119,7 @@ func BeforeRun() { mux.HandleFunc("/debug/vars", metricsHandler) // Ensure we are listening before returning - listener, err := net.Listen("tcp", *httpprof) + listener, err := api.MakeListener(api.Config{Host: *httpprof}) if err != nil { logger.Errorf("Failed to start pprof listener: %v", err) os.Exit(1) From b035fe8f5e32373f59620686d6b32a9abbc8b1eb Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Thu, 4 Nov 2021 12:31:37 -0700 Subject: [PATCH 02/17] Enable pprof debug endpoint on socket for agent and beats Force the elastic-agent and all beats that it starts to run the http/pprof listener on a local socket. --- libbeat/service/service.go | 29 +++++++++++++++ x-pack/elastic-agent/pkg/agent/cmd/run.go | 12 +++---- .../elastic-agent/pkg/agent/program/spec.go | 1 + .../core/monitoring/beats/beats_monitor.go | 9 +++++ .../pkg/core/monitoring/beats/monitoring.go | 35 +++++++++++++++++++ .../pkg/core/monitoring/config/config.go | 2 ++ 6 files changed, 82 insertions(+), 6 deletions(-) diff --git a/libbeat/service/service.go b/libbeat/service/service.go index 7901ebe469a2..ea47fe90dc29 100644 --- a/libbeat/service/service.go +++ b/libbeat/service/service.go @@ -132,6 +132,35 @@ func BeforeRun() { }() } +// BeforeRunWithPprof will start the http/pprof endpoints on the specified listener. +// The listener may be a host:socket, unix port, or Windows named pipe. +func BeforeRunWithPprof(listen string) { + logger := logp.NewLogger("service") + logger.Info("Start pprof endpoint") + mux := http.NewServeMux() + + // Register pprof handler + mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) { + http.DefaultServeMux.ServeHTTP(w, r) + }) + + // Register metrics handler + mux.HandleFunc("/debug/vars", metricsHandler) + + // Ensure we are listening before returning + listener, err := api.MakeListener(api.Config{Host: listen}) + if err != nil { + logger.Errorf("Failed to start pprof listener: %v", err) + os.Exit(1) + } + + go func() { + // Serve returns always a non-nil error + err := http.Serve(listener, mux) + logger.Infof("Finished pprof endpoint: %v", err) + }() +} + // metricsHandler reports expvar and all libbeat/monitoring metrics func metricsHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index 56a41e068957..2cde7076a5f6 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -74,7 +74,12 @@ func run(streams *cli.IOStreams, override cfgOverrider) error { } defer locker.Unlock() - service.BeforeRun() + cfg, err := loadConfig(override) + if err != nil { + return err + } + + service.BeforeRunWithPprof(beats.AgentDebugEndpoint(cfg.Settings.DownloadConfig.OS())) defer service.Cleanup() // register as a service @@ -85,11 +90,6 @@ func run(streams *cli.IOStreams, override cfgOverrider) error { } service.HandleSignals(stopBeat, cancel) - cfg, err := loadConfig(override) - if err != nil { - return err - } - logger, err := logger.NewFromConfig("", cfg.Settings.LoggingConfig, true) if err != nil { return err diff --git a/x-pack/elastic-agent/pkg/agent/program/spec.go b/x-pack/elastic-agent/pkg/agent/program/spec.go index 34ce2fec279c..4745943a1d3d 100644 --- a/x-pack/elastic-agent/pkg/agent/program/spec.go +++ b/x-pack/elastic-agent/pkg/agent/program/spec.go @@ -35,6 +35,7 @@ type Spec struct { ActionInputTypes []string `yaml:"action_input_types,omitempty"` LogPaths map[string]string `yaml:"log_paths,omitempty"` MetricEndpoints map[string]string `yaml:"metric_endpoints,omitempty"` + DebugEndpoints map[string]string `yaml:"debug_endpoints,omitempty"` Rules *transpiler.RuleList `yaml:"rules"` CheckInstallSteps *transpiler.StepList `yaml:"check_install"` PostInstallSteps *transpiler.StepList `yaml:"post_install"` diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go index 9cf9427bf5af..1de962db543d 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go @@ -94,6 +94,10 @@ func (b *Monitor) generateMonitoringEndpoint(spec program.Spec, pipelineID strin return MonitoringEndpoint(spec, b.operatingSystem, pipelineID) } +func (b *Monitor) generateDebugEndpoint(spec program.Spec, pipelineID string) string { + return DebugEndpoint(spec, b.operatingSystem, pipelineID) +} + func (b *Monitor) generateLoggingFile(spec program.Spec, pipelineID string) string { return getLoggingFile(spec, b.operatingSystem, b.installPath, pipelineID) } @@ -125,6 +129,11 @@ func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string ) } + debugEndpoint := b.generateDebugEndpoint(spec, pipelineID) + appendix = append(appendix, + "-httpprof", debugEndpoint, + ) + loggingPath := b.generateLoggingPath(spec, pipelineID) if loggingPath != "" { logFile := spec.Cmd diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go index 240ce5adbb22..c91460d0d919 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go @@ -25,6 +25,8 @@ const ( // args: pipeline name, application name agentMbEndpointFileFormatWin = `npipe:///elastic-agent` + // args: pipeline name, application name debug + agentDebugEndpointFileFormatWin = `npipe:///elastic-agent-debug` // agentMbEndpointHTTP is used with cloud and exposes metrics on http endpoint agentMbEndpointHTTP = "http://%s:%d" ) @@ -47,6 +49,24 @@ func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) s return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path))) } +// DebugEndpoint is an endpoint where process is exposing its pprof endpoints +func DebugEndpoint(spec program.Spec, operatingSystem, pipelineID string) string { + if endpoint, ok := spec.DebugEndpoints[operatingSystem]; ok { + return endpoint + } + if operatingSystem == "windows" { + return fmt.Sprintf(mbEndpointFileFormatWin, pipelineID, spec.Cmd+"-debug") + } + // unix socket path must be less than 104 characters + path := fmt.Sprintf("unix://%s-debug.sock", filepath.Join(paths.TempDir(), pipelineID, spec.Cmd, spec.Cmd)) + if len(path) < 104 { + return path + } + // place in global /tmp (or /var/tmp on Darwin) to ensure that its small enough to fit; current path is way to long + // for it to be used, but needs to be unique per Agent (in the case that multiple are running) + return fmt.Sprintf(`unix:///tmp/elastic-agent/%x-debug.sock`, sha256.Sum256([]byte(path))) +} + func getLoggingFile(spec program.Spec, operatingSystem, installPath, pipelineID string) string { if path, ok := spec.LogPaths[operatingSystem]; ok { return path @@ -76,6 +96,21 @@ func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.Monit return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path))) } +// AgentDebugEndpoint returns endpoint with exposed http/pprof endpoints for agent. +func AgentDebugEndpoint(operatingSystem string) string { + if operatingSystem == "windows" { + return agentDebugEndpointFileFormatWin + } + // unix socket path must be less than 104 characters + path := fmt.Sprintf("unix://%s-debug.sock", filepath.Join(paths.TempDir(), "elastic-agent")) + if len(path) < 104 { + return path + } + // place in global /tmp to ensure that its small enough to fit; current path is way to long + // for it to be used, but needs to be unique per Agent (in the case that multiple are running) + return fmt.Sprintf(`unix:///tmp/elastic-agent/%x-debug.sock`, sha256.Sum256([]byte(path))) +} + // AgentPrefixedMonitoringEndpoint returns endpoint with exposed metrics for agent. func AgentPrefixedMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.MonitoringHTTPConfig) string { return httpPlusPrefix + AgentMonitoringEndpoint(operatingSystem, cfg) diff --git a/x-pack/elastic-agent/pkg/core/monitoring/config/config.go b/x-pack/elastic-agent/pkg/core/monitoring/config/config.go index 7e1dbc77273a..10a3a6bd4a91 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/config/config.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/config/config.go @@ -15,6 +15,7 @@ type MonitoringConfig struct { LogMetrics bool `yaml:"-" config:"-"` HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"` Namespace string `yaml:"namespace" config:"namespace"` + Pprof bool `yaml:"pprof" config:"pprof"` } // MonitoringHTTPConfig is a config defining HTTP endpoint published by agent @@ -38,5 +39,6 @@ func DefaultConfig() *MonitoringConfig { Port: defaultPort, }, Namespace: defaultNamespace, + Pprof: true, } } From 78069aee09b81768fcdacffa6f420e464e16cf4f Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Fri, 5 Nov 2021 17:08:06 -0700 Subject: [PATCH 03/17] Add new Pprof command to control.proto --- x-pack/elastic-agent/control.proto | 42 ++ .../pkg/agent/control/proto/control.pb.go | 513 +++++++++++++++--- 2 files changed, 494 insertions(+), 61 deletions(-) diff --git a/x-pack/elastic-agent/control.proto b/x-pack/elastic-agent/control.proto index 26b6552c395c..53168f872ba0 100644 --- a/x-pack/elastic-agent/control.proto +++ b/x-pack/elastic-agent/control.proto @@ -29,6 +29,19 @@ enum ActionStatus { FAILURE = 1; } +// pprof endpoint that can be requested. +enum PprofOption { + ALLOCS = 0; + BLOCK = 1; + CMDLINE = 2; + GOROUTINE = 3; + HEAP = 4; + MUTEX = 5; + PROFILE = 6; + THREADCREATE = 7; + TRACE = 8; +} + // Empty message. message Empty { } @@ -128,6 +141,32 @@ message ProcMetaResponse { repeated ProcMeta procs = 1; } +// PprofRequest is a request for pprof data from and http/pprof endpoint. +message PprofRequest { + // The profiles that are requested + repeated PprofOption pprofType = 1; + // A string representing a time.Duration to apply to trace, and profile options. + string traceDuration = 2; + // The application that will be profiled, if empty all applications are profiled. + string appName = 3; + // The route key to match for profiling, if empty all are profiled. + string routeKey = 4; +} + +// PprofResult is the result of a pprof request for a given application/route key. +message PprofResult { + string appName = 1; + string routeKey = 2; + PprofOption pprofType = 3; + bytes result = 4; + string error = 5; +} + +// PprofResponse is a wrapper to return all pprof responses. +message PprofResponse { + repeated PprofResult results = 1; +} + service ElasticAgentControl { // Fetches the currently running version of the Elastic Agent. rpc Version(Empty) returns (VersionResponse); @@ -143,4 +182,7 @@ service ElasticAgentControl { // Gather all running process metadata. rpc ProcMeta(Empty) returns (ProcMetaResponse); + + // Gather requested pprof data from specified applications. + rpc Pprof(PprofRequest) returns (PprofResponse); } diff --git a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go index 2cdec52cf65c..70c66acd4abd 100644 --- a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go +++ b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go @@ -143,6 +143,74 @@ func (ActionStatus) EnumDescriptor() ([]byte, []int) { return file_control_proto_rawDescGZIP(), []int{1} } +// pprof endpoint that can be requested. +type PprofOption int32 + +const ( + PprofOption_ALLOCS PprofOption = 0 + PprofOption_BLOCK PprofOption = 1 + PprofOption_CMDLINE PprofOption = 2 + PprofOption_GOROUTINE PprofOption = 3 + PprofOption_HEAP PprofOption = 4 + PprofOption_MUTEX PprofOption = 5 + PprofOption_PROFILE PprofOption = 6 + PprofOption_THREADCREATE PprofOption = 7 + PprofOption_TRACE PprofOption = 8 +) + +// Enum value maps for PprofOption. +var ( + PprofOption_name = map[int32]string{ + 0: "ALLOCS", + 1: "BLOCK", + 2: "CMDLINE", + 3: "GOROUTINE", + 4: "HEAP", + 5: "MUTEX", + 6: "PROFILE", + 7: "THREADCREATE", + 8: "TRACE", + } + PprofOption_value = map[string]int32{ + "ALLOCS": 0, + "BLOCK": 1, + "CMDLINE": 2, + "GOROUTINE": 3, + "HEAP": 4, + "MUTEX": 5, + "PROFILE": 6, + "THREADCREATE": 7, + "TRACE": 8, + } +) + +func (x PprofOption) Enum() *PprofOption { + p := new(PprofOption) + *p = x + return p +} + +func (x PprofOption) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (PprofOption) Descriptor() protoreflect.EnumDescriptor { + return file_control_proto_enumTypes[2].Descriptor() +} + +func (PprofOption) Type() protoreflect.EnumType { + return &file_control_proto_enumTypes[2] +} + +func (x PprofOption) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use PprofOption.Descriptor instead. +func (PprofOption) EnumDescriptor() ([]byte, []int) { + return file_control_proto_rawDescGZIP(), []int{2} +} + // Empty message. type Empty struct { state protoimpl.MessageState @@ -807,6 +875,210 @@ func (x *ProcMetaResponse) GetProcs() []*ProcMeta { return nil } +// PprofRequest is a request for pprof data from and http/pprof endpoint. +type PprofRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The profiles that are requested + PprofType []PprofOption `protobuf:"varint,1,rep,packed,name=pprofType,proto3,enum=proto.PprofOption" json:"pprofType,omitempty"` + // A string representing a time.Duration to apply to trace, and profile options. + TraceDuration string `protobuf:"bytes,2,opt,name=traceDuration,proto3" json:"traceDuration,omitempty"` + // The application that will be profiled, if empty all applications are profiled. + AppName string `protobuf:"bytes,3,opt,name=appName,proto3" json:"appName,omitempty"` + // The route key to match for profiling, if empty all are profiled. + RouteKey string `protobuf:"bytes,4,opt,name=routeKey,proto3" json:"routeKey,omitempty"` +} + +func (x *PprofRequest) Reset() { + *x = PprofRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_control_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PprofRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PprofRequest) ProtoMessage() {} + +func (x *PprofRequest) ProtoReflect() protoreflect.Message { + mi := &file_control_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PprofRequest.ProtoReflect.Descriptor instead. +func (*PprofRequest) Descriptor() ([]byte, []int) { + return file_control_proto_rawDescGZIP(), []int{9} +} + +func (x *PprofRequest) GetPprofType() []PprofOption { + if x != nil { + return x.PprofType + } + return nil +} + +func (x *PprofRequest) GetTraceDuration() string { + if x != nil { + return x.TraceDuration + } + return "" +} + +func (x *PprofRequest) GetAppName() string { + if x != nil { + return x.AppName + } + return "" +} + +func (x *PprofRequest) GetRouteKey() string { + if x != nil { + return x.RouteKey + } + return "" +} + +// PprofResult is the result of a pprof request for a given application/route key. +type PprofResult struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AppName string `protobuf:"bytes,1,opt,name=appName,proto3" json:"appName,omitempty"` + RouteKey string `protobuf:"bytes,2,opt,name=routeKey,proto3" json:"routeKey,omitempty"` + PprofType PprofOption `protobuf:"varint,3,opt,name=pprofType,proto3,enum=proto.PprofOption" json:"pprofType,omitempty"` + Result []byte `protobuf:"bytes,4,opt,name=result,proto3" json:"result,omitempty"` + Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"` +} + +func (x *PprofResult) Reset() { + *x = PprofResult{} + if protoimpl.UnsafeEnabled { + mi := &file_control_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PprofResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PprofResult) ProtoMessage() {} + +func (x *PprofResult) ProtoReflect() protoreflect.Message { + mi := &file_control_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PprofResult.ProtoReflect.Descriptor instead. +func (*PprofResult) Descriptor() ([]byte, []int) { + return file_control_proto_rawDescGZIP(), []int{10} +} + +func (x *PprofResult) GetAppName() string { + if x != nil { + return x.AppName + } + return "" +} + +func (x *PprofResult) GetRouteKey() string { + if x != nil { + return x.RouteKey + } + return "" +} + +func (x *PprofResult) GetPprofType() PprofOption { + if x != nil { + return x.PprofType + } + return PprofOption_ALLOCS +} + +func (x *PprofResult) GetResult() []byte { + if x != nil { + return x.Result + } + return nil +} + +func (x *PprofResult) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +// PprofResponse is a wrapper to return all pprof responses. +type PprofResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Results []*PprofResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` +} + +func (x *PprofResponse) Reset() { + *x = PprofResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_control_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PprofResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PprofResponse) ProtoMessage() {} + +func (x *PprofResponse) ProtoReflect() protoreflect.Message { + mi := &file_control_proto_msgTypes[11] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PprofResponse.ProtoReflect.Descriptor instead. +func (*PprofResponse) Descriptor() ([]byte, []int) { + return file_control_proto_rawDescGZIP(), []int{11} +} + +func (x *PprofResponse) GetResults() []*PprofResult { + if x != nil { + return x.Results + } + return nil +} + var File_control_proto protoreflect.FileDescriptor var file_control_proto_rawDesc = []byte{ @@ -886,37 +1158,73 @@ var file_control_proto_rawDesc = []byte{ 0x10, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, - 0x61, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x63, 0x73, 0x2a, 0x79, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x54, 0x41, 0x52, 0x54, 0x49, 0x4e, 0x47, 0x10, 0x00, - 0x12, 0x0f, 0x0a, 0x0b, 0x43, 0x4f, 0x4e, 0x46, 0x49, 0x47, 0x55, 0x52, 0x49, 0x4e, 0x47, 0x10, - 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x59, 0x10, 0x02, 0x12, 0x0c, - 0x0a, 0x08, 0x44, 0x45, 0x47, 0x52, 0x41, 0x44, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, - 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x54, 0x4f, 0x50, - 0x50, 0x49, 0x4e, 0x47, 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x55, 0x50, 0x47, 0x52, 0x41, 0x44, - 0x49, 0x4e, 0x47, 0x10, 0x06, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x4f, 0x4c, 0x4c, 0x42, 0x41, 0x43, - 0x4b, 0x10, 0x07, 0x2a, 0x28, 0x0a, 0x0c, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x00, - 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x32, 0x93, 0x02, - 0x0a, 0x13, 0x45, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, - 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x07, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, - 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, - 0x65, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x31, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x0c, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, 0x1d, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, - 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0xf8, 0x01, 0x01, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x63, 0x73, 0x22, 0x9c, 0x01, 0x0a, 0x0c, 0x50, 0x70, 0x72, + 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x30, 0x0a, 0x09, 0x70, 0x70, 0x72, + 0x6f, 0x66, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x54, 0x79, 0x70, 0x65, 0x12, 0x24, 0x0a, 0x0d, 0x74, + 0x72, 0x61, 0x63, 0x65, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0d, 0x74, 0x72, 0x61, 0x63, 0x65, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x70, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x61, 0x70, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x22, 0xa3, 0x01, 0x0a, 0x0b, 0x50, 0x70, 0x72, 0x6f, + 0x66, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x70, 0x70, 0x4e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x70, 0x70, 0x4e, 0x61, 0x6d, + 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, + 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x54, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, + 0x32, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x54, 0x79, 0x70, 0x65, 0x12, + 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x3d, 0x0a, + 0x0d, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, + 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, + 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x2a, 0x79, 0x0a, 0x06, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x54, 0x41, 0x52, 0x54, 0x49, + 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x43, 0x4f, 0x4e, 0x46, 0x49, 0x47, 0x55, 0x52, + 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x59, + 0x10, 0x02, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x45, 0x47, 0x52, 0x41, 0x44, 0x45, 0x44, 0x10, 0x03, + 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, + 0x53, 0x54, 0x4f, 0x50, 0x50, 0x49, 0x4e, 0x47, 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x55, 0x50, + 0x47, 0x52, 0x41, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x4f, 0x4c, + 0x4c, 0x42, 0x41, 0x43, 0x4b, 0x10, 0x07, 0x2a, 0x28, 0x0a, 0x0c, 0x41, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, + 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, + 0x01, 0x2a, 0x7f, 0x0a, 0x0b, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4c, 0x4c, 0x4f, 0x43, 0x53, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, + 0x42, 0x4c, 0x4f, 0x43, 0x4b, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x4d, 0x44, 0x4c, 0x49, + 0x4e, 0x45, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x47, 0x4f, 0x52, 0x4f, 0x55, 0x54, 0x49, 0x4e, + 0x45, 0x10, 0x03, 0x12, 0x08, 0x0a, 0x04, 0x48, 0x45, 0x41, 0x50, 0x10, 0x04, 0x12, 0x09, 0x0a, + 0x05, 0x4d, 0x55, 0x54, 0x45, 0x58, 0x10, 0x05, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x52, 0x4f, 0x46, + 0x49, 0x4c, 0x45, 0x10, 0x06, 0x12, 0x10, 0x0a, 0x0c, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x43, + 0x52, 0x45, 0x41, 0x54, 0x45, 0x10, 0x07, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x52, 0x41, 0x43, 0x45, + 0x10, 0x08, 0x32, 0xc7, 0x02, 0x0a, 0x13, 0x45, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x41, 0x67, + 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x07, 0x56, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x06, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x52, 0x65, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, 0x55, + 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, + 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, + 0x61, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, + 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x4d, 0x65, 0x74, 0x61, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x32, 0x0a, 0x05, 0x50, 0x70, 0x72, 0x6f, + 0x66, 0x12, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, + 0x70, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, 0x1d, + 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, + 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0xf8, 0x01, 0x01, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -931,43 +1239,52 @@ func file_control_proto_rawDescGZIP() []byte { return file_control_proto_rawDescData } -var file_control_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_control_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_control_proto_enumTypes = make([]protoimpl.EnumInfo, 3) +var file_control_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_control_proto_goTypes = []interface{}{ (Status)(0), // 0: proto.Status (ActionStatus)(0), // 1: proto.ActionStatus - (*Empty)(nil), // 2: proto.Empty - (*VersionResponse)(nil), // 3: proto.VersionResponse - (*RestartResponse)(nil), // 4: proto.RestartResponse - (*UpgradeRequest)(nil), // 5: proto.UpgradeRequest - (*UpgradeResponse)(nil), // 6: proto.UpgradeResponse - (*ApplicationStatus)(nil), // 7: proto.ApplicationStatus - (*ProcMeta)(nil), // 8: proto.ProcMeta - (*StatusResponse)(nil), // 9: proto.StatusResponse - (*ProcMetaResponse)(nil), // 10: proto.ProcMetaResponse + (PprofOption)(0), // 2: proto.PprofOption + (*Empty)(nil), // 3: proto.Empty + (*VersionResponse)(nil), // 4: proto.VersionResponse + (*RestartResponse)(nil), // 5: proto.RestartResponse + (*UpgradeRequest)(nil), // 6: proto.UpgradeRequest + (*UpgradeResponse)(nil), // 7: proto.UpgradeResponse + (*ApplicationStatus)(nil), // 8: proto.ApplicationStatus + (*ProcMeta)(nil), // 9: proto.ProcMeta + (*StatusResponse)(nil), // 10: proto.StatusResponse + (*ProcMetaResponse)(nil), // 11: proto.ProcMetaResponse + (*PprofRequest)(nil), // 12: proto.PprofRequest + (*PprofResult)(nil), // 13: proto.PprofResult + (*PprofResponse)(nil), // 14: proto.PprofResponse } var file_control_proto_depIdxs = []int32{ 1, // 0: proto.RestartResponse.status:type_name -> proto.ActionStatus 1, // 1: proto.UpgradeResponse.status:type_name -> proto.ActionStatus 0, // 2: proto.ApplicationStatus.status:type_name -> proto.Status 0, // 3: proto.StatusResponse.status:type_name -> proto.Status - 7, // 4: proto.StatusResponse.applications:type_name -> proto.ApplicationStatus - 8, // 5: proto.ProcMetaResponse.procs:type_name -> proto.ProcMeta - 2, // 6: proto.ElasticAgentControl.Version:input_type -> proto.Empty - 2, // 7: proto.ElasticAgentControl.Status:input_type -> proto.Empty - 2, // 8: proto.ElasticAgentControl.Restart:input_type -> proto.Empty - 5, // 9: proto.ElasticAgentControl.Upgrade:input_type -> proto.UpgradeRequest - 2, // 10: proto.ElasticAgentControl.ProcMeta:input_type -> proto.Empty - 3, // 11: proto.ElasticAgentControl.Version:output_type -> proto.VersionResponse - 9, // 12: proto.ElasticAgentControl.Status:output_type -> proto.StatusResponse - 4, // 13: proto.ElasticAgentControl.Restart:output_type -> proto.RestartResponse - 6, // 14: proto.ElasticAgentControl.Upgrade:output_type -> proto.UpgradeResponse - 10, // 15: proto.ElasticAgentControl.ProcMeta:output_type -> proto.ProcMetaResponse - 11, // [11:16] is the sub-list for method output_type - 6, // [6:11] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 8, // 4: proto.StatusResponse.applications:type_name -> proto.ApplicationStatus + 9, // 5: proto.ProcMetaResponse.procs:type_name -> proto.ProcMeta + 2, // 6: proto.PprofRequest.pprofType:type_name -> proto.PprofOption + 2, // 7: proto.PprofResult.pprofType:type_name -> proto.PprofOption + 13, // 8: proto.PprofResponse.results:type_name -> proto.PprofResult + 3, // 9: proto.ElasticAgentControl.Version:input_type -> proto.Empty + 3, // 10: proto.ElasticAgentControl.Status:input_type -> proto.Empty + 3, // 11: proto.ElasticAgentControl.Restart:input_type -> proto.Empty + 6, // 12: proto.ElasticAgentControl.Upgrade:input_type -> proto.UpgradeRequest + 3, // 13: proto.ElasticAgentControl.ProcMeta:input_type -> proto.Empty + 12, // 14: proto.ElasticAgentControl.Pprof:input_type -> proto.PprofRequest + 4, // 15: proto.ElasticAgentControl.Version:output_type -> proto.VersionResponse + 10, // 16: proto.ElasticAgentControl.Status:output_type -> proto.StatusResponse + 5, // 17: proto.ElasticAgentControl.Restart:output_type -> proto.RestartResponse + 7, // 18: proto.ElasticAgentControl.Upgrade:output_type -> proto.UpgradeResponse + 11, // 19: proto.ElasticAgentControl.ProcMeta:output_type -> proto.ProcMetaResponse + 14, // 20: proto.ElasticAgentControl.Pprof:output_type -> proto.PprofResponse + 15, // [15:21] is the sub-list for method output_type + 9, // [9:15] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_control_proto_init() } @@ -1084,14 +1401,50 @@ func file_control_proto_init() { return nil } } + file_control_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PprofRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_control_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PprofResult); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_control_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PprofResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_control_proto_rawDesc, - NumEnums: 2, - NumMessages: 9, + NumEnums: 3, + NumMessages: 12, NumExtensions: 0, NumServices: 1, }, @@ -1128,6 +1481,8 @@ type ElasticAgentControlClient interface { Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (*UpgradeResponse, error) // Gather all running process metadata. ProcMeta(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*ProcMetaResponse, error) + // Gather requested pprof data from specified applications. + Pprof(ctx context.Context, in *PprofRequest, opts ...grpc.CallOption) (*PprofResponse, error) } type elasticAgentControlClient struct { @@ -1183,6 +1538,15 @@ func (c *elasticAgentControlClient) ProcMeta(ctx context.Context, in *Empty, opt return out, nil } +func (c *elasticAgentControlClient) Pprof(ctx context.Context, in *PprofRequest, opts ...grpc.CallOption) (*PprofResponse, error) { + out := new(PprofResponse) + err := c.cc.Invoke(ctx, "/proto.ElasticAgentControl/Pprof", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ElasticAgentControlServer is the server API for ElasticAgentControl service. type ElasticAgentControlServer interface { // Fetches the currently running version of the Elastic Agent. @@ -1195,6 +1559,8 @@ type ElasticAgentControlServer interface { Upgrade(context.Context, *UpgradeRequest) (*UpgradeResponse, error) // Gather all running process metadata. ProcMeta(context.Context, *Empty) (*ProcMetaResponse, error) + // Gather requested pprof data from specified applications. + Pprof(context.Context, *PprofRequest) (*PprofResponse, error) } // UnimplementedElasticAgentControlServer can be embedded to have forward compatible implementations. @@ -1216,6 +1582,9 @@ func (*UnimplementedElasticAgentControlServer) Upgrade(context.Context, *Upgrade func (*UnimplementedElasticAgentControlServer) ProcMeta(context.Context, *Empty) (*ProcMetaResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ProcMeta not implemented") } +func (*UnimplementedElasticAgentControlServer) Pprof(context.Context, *PprofRequest) (*PprofResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Pprof not implemented") +} func RegisterElasticAgentControlServer(s *grpc.Server, srv ElasticAgentControlServer) { s.RegisterService(&_ElasticAgentControl_serviceDesc, srv) @@ -1311,6 +1680,24 @@ func _ElasticAgentControl_ProcMeta_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _ElasticAgentControl_Pprof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PprofRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ElasticAgentControlServer).Pprof(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.ElasticAgentControl/Pprof", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ElasticAgentControlServer).Pprof(ctx, req.(*PprofRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _ElasticAgentControl_serviceDesc = grpc.ServiceDesc{ ServiceName: "proto.ElasticAgentControl", HandlerType: (*ElasticAgentControlServer)(nil), @@ -1335,6 +1722,10 @@ var _ElasticAgentControl_serviceDesc = grpc.ServiceDesc{ MethodName: "ProcMeta", Handler: _ElasticAgentControl_ProcMeta_Handler, }, + { + MethodName: "Pprof", + Handler: _ElasticAgentControl_Pprof_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "control.proto", From 131bf1f70e09c285473a79508212d73e3f2e8206 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Fri, 5 Nov 2021 17:08:33 -0700 Subject: [PATCH 04/17] Add pprof option to diagnostics collect --- .../pkg/agent/cmd/diagnostics.go | 90 +++++- .../pkg/agent/control/client/client.go | 33 +++ .../pkg/agent/control/server/server.go | 257 ++++++++++++++---- 3 files changed, 321 insertions(+), 59 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go index e90e4ab13c15..287a9b2e390f 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config/operations" @@ -89,12 +90,17 @@ func newDiagnosticsCollectCommandWithArgs(_ []string, streams *cli.IOStreams) *c return fmt.Errorf("unsupported output: %s", output) } - return diagnosticsCollectCmd(streams, file, output) + pprof, _ := c.Flags().GetBool("pprof") + d, _ := c.Flags().GetDuration("pprof-duration") + + return diagnosticsCollectCmd(streams, file, output, pprof, d) }, } cmd.Flags().StringP("file", "f", "", "name of the output diagnostics zip archive") cmd.Flags().String("output", "yaml", "Output the collected information in either json, or yaml (default: yaml)") // replace output flag with different options + cmd.Flags().Bool("pprof", false, "Collect all pprof data from all running applications.") + cmd.Flags().Duration("pprof-duration", time.Second*30, "The duration to collect trace and profiling data from the debug/pprof endpoints. (default: 30s)") return cmd } @@ -127,14 +133,18 @@ func diagnosticCmd(streams *cli.IOStreams, cmd *cobra.Command, args []string) er return outputFunc(streams.Out, diag) } -func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string) error { +func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string, pprof bool, pprofDur time.Duration) error { err := tryContainerLoadPaths() if err != nil { return err } ctx := handleSignal(context.Background()) - innerCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + d := time.Second * 30 + if pprof { + d = 2 * pprofDur + } + innerCtx, cancel := context.WithTimeout(ctx, d) defer cancel() diag, err := getDiagnostics(innerCtx) @@ -151,7 +161,15 @@ func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string return fmt.Errorf("unable to gather config data: %w", err) } - err = createZip(fileName, outputFormat, diag, cfg) + var pprofData map[string][]client.ProcPProf = nil + if pprof { + pprofData, err = getAllPprof(innerCtx, pprofDur) + if err != nil { + return fmt.Errorf("unable to gather pprof data: %w", err) + } + } + + err = createZip(fileName, outputFormat, diag, cfg, pprofData) if err != nil { return fmt.Errorf("unable to create archive %q: %w", fileName, err) } @@ -242,7 +260,7 @@ func gatherConfig() (AgentConfig, error) { // // The passed DiagnosticsInfo and AgentConfig data is written in the specified output format. // Any local log files are collected and copied into the archive. -func createZip(fileName, outputFormat string, diag DiagnosticsInfo, cfg AgentConfig) error { +func createZip(fileName, outputFormat string, diag DiagnosticsInfo, cfg AgentConfig, pprof map[string][]client.ProcPProf) error { f, err := os.Create(fileName) if err != nil { return err @@ -298,6 +316,13 @@ func createZip(fileName, outputFormat string, diag DiagnosticsInfo, cfg AgentCon return closeHandlers(err, zw, f) } + if pprof != nil { + err := zipProfs(zw, pprof) + if err != nil { + return closeHandlers(err, zw, f) + } + } + return closeHandlers(nil, zw, f) } @@ -371,3 +396,58 @@ func closeHandlers(err error, closers ...io.Closer) error { } return mErr.ErrorOrNil() } + +func getAllPprof(ctx context.Context, d time.Duration) (map[string][]client.ProcPProf, error) { + daemon := client.New() + err := daemon.Connect(ctx) + if err != nil { + return nil, err + } + pprofTypes := []proto.PprofOption{ + proto.PprofOption_ALLOCS, + proto.PprofOption_BLOCK, + proto.PprofOption_CMDLINE, + proto.PprofOption_GOROUTINE, + proto.PprofOption_HEAP, + proto.PprofOption_MUTEX, + proto.PprofOption_PROFILE, + proto.PprofOption_THREADCREATE, + proto.PprofOption_TRACE, + } + return daemon.Pprof(ctx, d, pprofTypes, "", "") +} + +func zipProfs(zw *zip.Writer, pprof map[string][]client.ProcPProf) error { + zf, err = zw.Create("pprof/") + if err != nil { + return err + } + for pType, profs := range pprof { + zf, err = zw.Create("pprof/" + pType + "/") + if err != nil { + return err + } + for _, p := range profs { + if p.Error != "" { + zf, err = zw.Create("pprof/" + pType + "/" + p.Name + "_" + p.RouteKey + "_error.txt") + if err != nil { + return err + } + _, err = zf.Write([]byte(p.Error)) + if err != nil { + return err + } + continue + } + zf, err = zw.Create("pprof/" + pType + "/" + p.Name + "_" + p.RouteKey + ".pprof") + if err != nil { + return err + } + _, err = zf.Write(p.Results) + if err != nil { + return err + } + } + } + return nil +} diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index f7332b9896e7..cb27f81a7a87 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -72,6 +72,14 @@ type ProcMeta struct { Error string } +// ProcPProf returns pprof data for a process. +type ProcPProf struct { + Name string + RouteKey string + Result []byte + Error string +} + // AgentStatus is the current status of the Elastic Agent. type AgentStatus struct { Status Status @@ -247,3 +255,28 @@ func (c *client) ProcMeta(ctx context.Context) ([]ProcMeta, error) { } return procMeta, nil } + +func (c *client) Pprof(ctx context.Context, d time.Duration, pprofTypes []proto.PprofOption, appName, routeKey string) (map[string][]ProcPProf, error) { + resp, err := c.client.Pprof(ctx, &proto.PprofRequest{ + PprofType: pprofTypes, + TraceDuration: d.String(), + AppName: appName, + RouteKey: routeKey, + }) + if err != nil { + return nil, err + } + res := map[string][]ProcPProf{} + for _, pType := range pprofTypes { + res[pType.String()] = make([]ProcPProf, 0) + } + for _, r := range resp.Results { + res[r.PprofType.String()] = append(res[r.PprofType.String()], ProcPProf{ + Name: r.AppName, + RouteKey: r.RouteKey, + Results: r.Result, + Error: r.Error, + }) + } + return res, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 072b212a7718..4b6ef77860ee 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -7,6 +7,8 @@ package server import ( "context" "encoding/json" + "fmt" + "io" "net" "net/http" "runtime" @@ -210,70 +212,217 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR specs := sp.Specs() for n, spec := range specs { - procMeta := &proto.ProcMeta{ - Name: n, - RouteKey: rk, - } - - client := http.Client{ - Timeout: time.Second * 5, - } endpoint := monitoring.MonitoringEndpoint(spec, runtime.GOOS, rk) - if strings.HasPrefix(endpoint, "unix://") { - client.Transport = &http.Transport{ - Proxy: nil, - DialContext: socket.DialContext(strings.TrimPrefix(endpoint, "unix://")), - } - endpoint = "unix" - } else if strings.HasPrefix(endpoint, "npipe://") { - client.Transport = &http.Transport{ - Proxy: nil, - DialContext: socket.DialContext(strings.TrimPrefix(endpoint, "npipe:///")), - } - endpoint = "npipe" - } + client := newSocketRequester(time.Second*5, n, rk, endpoint) + + procMeta := client.ProcMeta(ctx) + resp.Procs = append(resp.Procs, procMeta) + } + } + return resp, nil +} + +func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.PprofResponse, error) { + if s.routeFn == nil { + return nil, errors.New("route function is nil") + } + + dur, err := time.ParseDuration(req.TraceDuration) + if err != nil { + return nil, fmt.Errorf("unable to parse trace duration: %w", err) + } + + resp := &proto.PprofResonse{ + Results: []*proto.PprofResult{}, + } + + var wg sync.WaitGroup + ch := make(chan *proto.PprofResult, 1) + + routes := s.routeFn() + for _, rk := range routes.Keys() { + // Skip the rk if it does not match and one has been requested. + if req.RouteKey != "" && req.RouteKey != rk { + continue + } + programs, ok := routes.Get(rk) + if !ok { + s.logger.With("route_key", rk).Warn("Unable to retrieve route.") + continue + } - res, err := client.Get("http://" + endpoint + "/") - if err != nil { - procMeta.Error = err.Error() - resp.Procs = append(resp.Procs, procMeta) - continue + sp, ok := programs.(specer) + if !ok { + s.logger.With("route_key", rk, "route", programs).Warn("Unable to cast route as specer.") + continue + } + specs := sp.Specs() + for n, spec := range specs { + if req.AppName != "" && req.AppName != n { + continue // Skip the app if it does not match and one has been requested. } - if res.StatusCode != 200 { - procMeta.Error = "response status is: " + res.Status - resp.Procs = append(resp.Procs, procMeta) - continue + endpoint := monitoring.DebugEndpoint(spec, runtime.GOOS, rk) + c := newSocketRequester(dur*2, n, rk, endpoint) + + // Launch a concurrent goroutine to gather all pprof endpoints from a socket. + for _, opt := range req.PprofType { + wg.Add(1) + go func() { + res := c.GetPprof(ctx, opt, dur) + ch <- res + wg.Done() + }() } + } + } - bi := &BeatInfo{} - dec := json.NewDecoder(res.Body) - if err := dec.Decode(bi); err != nil { - res.Body.Close() - procMeta.Error = err.Error() - resp.Procs = append(resp.Procs, procMeta) - continue - } - res.Body.Close() - - procMeta.Process = bi.Beat - procMeta.Hostname = bi.Hostname - procMeta.Id = bi.ID - procMeta.EphemeralId = bi.EphemeralID - procMeta.Version = bi.Version - procMeta.BuildCommit = bi.Commit - procMeta.BuildTime = bi.Time - procMeta.Username = bi.Username - procMeta.UserId = bi.UserID - procMeta.UserGid = bi.GroupID - procMeta.Architecture = bi.BinaryArch - procMeta.ElasticLicensed = bi.ElasticLicensed + // wait for the waitgroup to be done and close the channel + go func() { + wg.Wait() + close(ch) + }() - resp.Procs = append(resp.Procs, procMeta) - } + // gather all results from channel until closed. + for res := range ch { + resp.Results = append(resp.Results, res) } return resp, nil } +// socketRequester is a struct to gather (diagnostics) data from a socket opened by elastic-agent or one if it's processes +type socketRequester struct { + c http.Client + endpoint string + appName string + routeKey string +} + +func newSocketRequester(timeout time.Duration, appName, routeKey, endpoint string) *socketRequester { + c := http.Client{Timeout: timeout} + if strings.HasPrefix(endpoint, "unix://") { + c.Transport = &http.Transport{ + Proxy: nil, + DialContext: socket.DialContext(strings.TrimPrefix(endpoint, "unix://")), + } + endpoint = "unix" + } else if strings.HasPrefix(endpoint, "npipe://") { + c.Transport = &http.Transport{ + Proxy: nil, + DialContext: socket.DialContext(strings.TrimPrefix(endpoint, "npipe:///")), + } + endpoint = "npipe" + } + return &socketRequester{ + c: c, + appName: appName, + routeKey: routeKey, + endpoint: endpoint, + } +} + +// getPath creates a get request for the specified path. +// Will return an error if that status code is not 200. +func (r *socketRequester) getPath(ctx context.Context, path string) (*http.Response, error) { + req, err := http.NewRequest("GET", "http://"+r.endpoint+path, nil) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + res, err := r.c.Do(req) + if err != nil { + return nil, err + } + if res.StatusCode != 200 { + res.Body.Close() + return nil, fmt.Errorf("response status is %d", res.StatusCode) + } + return res, nil + +} + +// ProcMeta will return process metadata by querying the "/" path. +func (r *socketRequester) ProcMeta(ctx context.Context) *proto.ProcMeta { + pm := &proto.ProcMeta{ + Name: r.appName, + RouteKey: r.routeKey, + } + + res, err := r.c.getPath(ctx, "/") + if err != nil { + pm.Error = err.Error() + return pm + } + defer res.Body.Close() + + bi := &BeatInfo{} + dec := json.NewDecoder(res.Body) + if err := dec.Decode(bi); err != nil { + pm.Error = err.Error() + return pm + } + + pm.Process = bi.Beat + pm.Hostname = bi.Hostname + pm.Id = bi.ID + pm.EphemeralId = bi.EphemeralID + pm.Version = bi.Version + pm.BuildCommit = bi.Commit + pm.BuildTime = bi.Time + pm.Username = bi.Username + pm.UserId = bi.UserID + pm.UserGid = bi.GroupID + pm.Architecture = bi.BinaryArch + pm.ElasticLicensed = bi.ElasticLicensed + + return pm +} + +var pprofEndopoints = map[proto.PprofOption]string{ + proto.PprofOption_ALLOCS: "/debug/pprof/allocs", + proto.PprofOption_BLOCK: "/debug/pprof/block", + proto.PprofOption_CMDLINE: "/debug/pprof/cmdline", + proto.PprofOption_GOROUTINE: "/debug/pprof/goroutine", + proto.PprofOption_HEAP: "/debug/pprof/heap", + proto.PprofOption_MUTEX: "/debug/pprof/mutex", + proto.PprofOption_PROFILE: "/debug/pprof/profile", + proto.PprofOption_THREADCREATE: "/debug/pprof/threadcreate", + proto.PprofOption_TRACE: "/debug/pprof/trace", +} + +// GetProf will gather pprof data specified by the option. +func (r *socketRequester) GetPprof(ctx context.Context, opt proto.PprofOption, dur time.Duration) *proto.PprofResult { + res := &proto.PprofResult{ + AppName: r.appName, + RouteKey: r.routeKey, + PprofType: opt, + } + + path, ok := pprofEndpoints[opt] + if !ok { + res.Error = "unknown path for option" + return res + } + + if opt == proto.PprofOption_PROFILE || opt == proto.PprofOption_TRACE { + path += fmt.Sprintf("?seconds=%0.f", dur.Seconds()) + } + + resp, err := r.c.getPath(ctx, path) + if err != nil { + res.Error = err.Error() + return res + } + defer resp.Body.Close() + + p, err := io.ReadAll(resp.Body) + if err != nil { + res.Error = err.Error() + return res + } + res.Result = p + return res +} + type upgradeRequest struct { *proto.UpgradeRequest } From c61c0100d8d4382c73504f29e9dfda568cea3269 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 8 Nov 2021 10:21:59 -0800 Subject: [PATCH 05/17] Fix linting issues --- x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go | 4 ++-- .../pkg/agent/control/client/client.go | 4 +++- .../pkg/agent/control/server/server.go | 13 +++++++------ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go index 287a9b2e390f..359d4348ed52 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go @@ -418,7 +418,7 @@ func getAllPprof(ctx context.Context, d time.Duration) (map[string][]client.Proc } func zipProfs(zw *zip.Writer, pprof map[string][]client.ProcPProf) error { - zf, err = zw.Create("pprof/") + zf, err := zw.Create("pprof/") if err != nil { return err } @@ -443,7 +443,7 @@ func zipProfs(zw *zip.Writer, pprof map[string][]client.ProcPProf) error { if err != nil { return err } - _, err = zf.Write(p.Results) + _, err = zf.Write(p.Result) if err != nil { return err } diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index cb27f81a7a87..6add5470b7ad 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -103,6 +103,8 @@ type Client interface { Upgrade(ctx context.Context, version string, sourceURI string) (string, error) // ProcMeta gathers running process meta-data. ProcMeta(ctx context.Context) ([]ProcMeta, error) + // Ppro + Pprof(ctx context.Context, d time.Duration, pprofTypes []proto.PprofOption, appName, routeKey string) (map[string][]ProcPProf, error) } // client manages the state and communication to the Elastic Agent. @@ -274,7 +276,7 @@ func (c *client) Pprof(ctx context.Context, d time.Duration, pprofTypes []proto. res[r.PprofType.String()] = append(res[r.PprofType.String()], ProcPProf{ Name: r.AppName, RouteKey: r.RouteKey, - Results: r.Result, + Result: r.Result, Error: r.Error, }) } diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 4b6ef77860ee..7f595b0f7080 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -222,6 +222,7 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR return resp, nil } +// Pprof returns /debug/pprof data for the requested applicaiont-route_key or all running applications. func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.PprofResponse, error) { if s.routeFn == nil { return nil, errors.New("route function is nil") @@ -232,7 +233,7 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr return nil, fmt.Errorf("unable to parse trace duration: %w", err) } - resp := &proto.PprofResonse{ + resp := &proto.PprofResponse{ Results: []*proto.PprofResult{}, } @@ -267,11 +268,11 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr // Launch a concurrent goroutine to gather all pprof endpoints from a socket. for _, opt := range req.PprofType { wg.Add(1) - go func() { + go func(opt proto.PprofOption) { res := c.GetPprof(ctx, opt, dur) ch <- res wg.Done() - }() + }(opt) } } } @@ -347,7 +348,7 @@ func (r *socketRequester) ProcMeta(ctx context.Context) *proto.ProcMeta { RouteKey: r.routeKey, } - res, err := r.c.getPath(ctx, "/") + res, err := r.getPath(ctx, "/") if err != nil { pm.Error = err.Error() return pm @@ -377,7 +378,7 @@ func (r *socketRequester) ProcMeta(ctx context.Context) *proto.ProcMeta { return pm } -var pprofEndopoints = map[proto.PprofOption]string{ +var pprofEndpoints = map[proto.PprofOption]string{ proto.PprofOption_ALLOCS: "/debug/pprof/allocs", proto.PprofOption_BLOCK: "/debug/pprof/block", proto.PprofOption_CMDLINE: "/debug/pprof/cmdline", @@ -407,7 +408,7 @@ func (r *socketRequester) GetPprof(ctx context.Context, opt proto.PprofOption, d path += fmt.Sprintf("?seconds=%0.f", dur.Seconds()) } - resp, err := r.c.getPath(ctx, path) + resp, err := r.getPath(ctx, path) if err != nil { res.Error = err.Error() return res From e3fba764efacfcd2fda5f580bae89ca78482cf99 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 8 Nov 2021 12:11:57 -0800 Subject: [PATCH 06/17] Add diagonstics pprof command allow pprof to collect from agent --- .../pkg/agent/cmd/diagnostics.go | 87 ++++++++++++++++++- .../pkg/agent/control/client/client.go | 1 + .../pkg/agent/control/server/server.go | 15 ++++ 3 files changed, 102 insertions(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go index 359d4348ed52..e7d8614b6bbd 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go @@ -64,6 +64,7 @@ func newDiagnosticsCommand(s []string, streams *cli.IOStreams) *cobra.Command { cmd.Flags().String("output", "human", "Output the diagnostics information in either human, json, or yaml (default: human)") cmd.AddCommand(newDiagnosticsCollectCommandWithArgs(s, streams)) + cmd.AddCommand(newDiagnosticsPprofCommandWithArgs(s, streams)) return cmd } @@ -73,7 +74,7 @@ func newDiagnosticsCollectCommandWithArgs(_ []string, streams *cli.IOStreams) *c Use: "collect", Short: "Collect diagnostics information from the elastic-agent and write it to a zip archive.", Long: "Collect diagnostics information from the elastic-agent and write it to a zip archive.\nNote that any credentials will appear in plain text.", - Args: cobra.MaximumNArgs(1), + Args: cobra.MaximumNArgs(3), RunE: func(c *cobra.Command, args []string) error { file, _ := c.Flags().GetString("file") @@ -105,6 +106,32 @@ func newDiagnosticsCollectCommandWithArgs(_ []string, streams *cli.IOStreams) *c return cmd } +func newDiagnosticsPprofCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command { + cmd := &cobra.Command{ + Use: "pprof", + Short: "Collect pprof information from a running process.", + Long: "Collect pprof information from the elastic-agent or one of its processes and write to stdout or a file.", + Args: cobra.MaximumNArgs(5), + RunE: func(c *cobra.Command, args []string) error { + file, _ := c.Flags().GetString("file") + pprofType, _ := c.Flags().GetString("pprof-type") + d, _ := c.Flags().GetDuration("pprof-duration") + pprofApp, _ := c.Flags().GetString("pprof-application") + pprofRK, _ := c.Flags().GetString("pprof-route-key") + + return diagnosticsPprofCmd(streams, d, file, pprofType, pprofApp, pprofRK) + }, + } + + cmd.Flags().StringP("file", "f", "", "name of the output file, stdout if unspecified.") + cmd.Flags().String("pprof-type", "profile", "Collect all pprof data from all running applications. Select one of [allocs, block, cmdline, goroutine, heap, mutex, profile, threadcreate, trace]") + cmd.Flags().Duration("pprof-duration", time.Second*30, "The duration to collect trace and profiling data from the debug/pprof endpoints. (default: 30s)") + cmd.Flags().String("pprof-application", "elastic-agent", "Application name to collect pprof data from.") + cmd.Flags().String("pprof-route-key", "default", "Route key to collect pprof data from.") + + return cmd +} + func diagnosticCmd(streams *cli.IOStreams, cmd *cobra.Command, args []string) error { err := tryContainerLoadPaths() if err != nil { @@ -178,6 +205,64 @@ func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string return nil } +func diagnosticsPprofCmd(streams *cli.IOStreams, dur time.Duration, outFile, pType, appName, rk string) error { + pt, ok := proto.PprofOption_value[strings.ToUpper(pType)] + if !ok { + return fmt.Errorf("unknown pprof-type %q, select one of [allocs, block, cmdline, goroutine, heap, mutex, profile, threadcreate, trace]", pType) + } + + // the elastic-agent application does not have a route key + if appName == "elastic-agent" { + rk = "" + } + + ctx := handleSignal(context.Background()) + innerCtx, cancel := context.WithTimeout(ctx, dur*2) + defer cancel() + + daemon := client.New() + err := daemon.Connect(ctx) + if err != nil { + return err + } + + pprofData, err := daemon.Pprof(innerCtx, dur, []proto.PprofOption{proto.PprofOption(pt)}, appName, rk) + if err != nil { + return err + } + + // validate response + pArr, ok := pprofData[proto.PprofOption_name[pt]] + if !ok { + return fmt.Errorf("route key %q not found in response data (map length: %d)", rk, len(pprofData)) + } + if len(pArr) != 1 { + return fmt.Errorf("route key application length 1 expected, recieved %d", len(pArr)) + } + res := pArr[0] + + if res.Error != "" { + return fmt.Errorf(res.Error) + } + + // handle result + if outFile != "" { + f, err := os.Create(outFile) + if err != nil { + return err + } + defer f.Close() + _, err = f.Write(res.Result) + if err != nil { + return err + } + fmt.Fprintf(streams.Out, "pprof data written to %s\n", outFile) + return nil + } + streams.Out.Write(res.Result) + return nil +} + func getDiagnostics(ctx context.Context) (DiagnosticsInfo, error) { daemon := client.New() diag := DiagnosticsInfo{} diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index 6add5470b7ad..dbc586ea81ad 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -258,6 +258,7 @@ func (c *client) ProcMeta(ctx context.Context) ([]ProcMeta, error) { return procMeta, nil } +// Pprof gathers /debug/pprof data and returns a map of pprof-type: ProcPProf data func (c *client) Pprof(ctx context.Context, d time.Duration, pprofTypes []proto.PprofOption, appName, routeKey string) (map[string][]ProcPProf, error) { resp, err := c.client.Pprof(ctx, &proto.PprofRequest{ PprofType: pprofTypes, diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 7f595b0f7080..250a9911d4b3 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats" monitoring "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/socket" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" @@ -240,6 +241,20 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr var wg sync.WaitGroup ch := make(chan *proto.PprofResult, 1) + // retrieve elastic-agent pprof data if requested or application is unspecified. + if req.AppName == "" || req.AppName == "elastic-agent" { + endpoint := beats.AgentDebugEndpoint(runtime.GOOS) + c := newSocketRequester(dur*2, "elastic-agent", "", endpoint) + for _, opt := range req.PprofType { + wg.Add(1) + go func(opt proto.PprofOption) { + res := c.GetPprof(ctx, opt, dur) + ch <- res + wg.Done() + }(opt) + } + } + routes := s.routeFn() for _, rk := range routes.Keys() { // Skip the rk if it does not match and one has been requested. From dc66dbdcf993a85ac7156723a9980c6d3b9ecf2d Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 17 Nov 2021 14:00:40 -0800 Subject: [PATCH 07/17] Revert debug socket changes --- libbeat/api/make_listener_posix.go | 2 +- libbeat/api/make_listener_windows.go | 2 +- libbeat/api/server.go | 2 +- libbeat/docs/command-reference.asciidoc | 5 ++- libbeat/service/service.go | 33 ++--------------- x-pack/elastic-agent/pkg/agent/cmd/run.go | 12 +++---- .../pkg/agent/control/server/server.go | 4 +-- .../elastic-agent/pkg/agent/program/spec.go | 1 - .../core/monitoring/beats/beats_monitor.go | 9 ----- .../pkg/core/monitoring/beats/monitoring.go | 35 ------------------- .../pkg/core/monitoring/config/config.go | 2 -- 11 files changed, 15 insertions(+), 92 deletions(-) diff --git a/libbeat/api/make_listener_posix.go b/libbeat/api/make_listener_posix.go index cede02f126fb..17b87135fb94 100644 --- a/libbeat/api/make_listener_posix.go +++ b/libbeat/api/make_listener_posix.go @@ -30,7 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/api/npipe" ) -func MakeListener(cfg Config) (net.Listener, error) { +func makeListener(cfg Config) (net.Listener, error) { if len(cfg.User) > 0 { return nil, errors.New("specifying a user is not supported under this platform") } diff --git a/libbeat/api/make_listener_windows.go b/libbeat/api/make_listener_windows.go index 5793af84daae..c23f2ab29e64 100644 --- a/libbeat/api/make_listener_windows.go +++ b/libbeat/api/make_listener_windows.go @@ -29,7 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/api/npipe" ) -func MakeListener(cfg Config) (net.Listener, error) { +func makeListener(cfg Config) (net.Listener, error) { if len(cfg.User) > 0 && len(cfg.SecurityDescriptor) > 0 { return nil, errors.New("user and security_descriptor are mutually exclusive, define only one of them") } diff --git a/libbeat/api/server.go b/libbeat/api/server.go index bf2287f7a526..0b7e6d022dc8 100644 --- a/libbeat/api/server.go +++ b/libbeat/api/server.go @@ -49,7 +49,7 @@ func New(log *logp.Logger, mux *http.ServeMux, config *common.Config) (*Server, return nil, err } - l, err := MakeListener(cfg) + l, err := makeListener(cfg) if err != nil { return nil, err } diff --git a/libbeat/docs/command-reference.asciidoc b/libbeat/docs/command-reference.asciidoc index 85f512d0ba8a..833f698682d3 100644 --- a/libbeat/docs/command-reference.asciidoc +++ b/libbeat/docs/command-reference.asciidoc @@ -665,9 +665,8 @@ endif::[] *`-h, --help`*:: Shows help for the `run` command. -*`--httpprof BIND`*:: -Starts an http server for profiling. `BIND` can be specified as a `host:port`, -unix socket, or Windows named pipe. This option is useful for troubleshooting +*`--httpprof [HOST]:PORT`*:: +Starts an http server for profiling. This option is useful for troubleshooting and profiling {beatname_uc}. ifeval::["{beatname_lc}"=="packetbeat"] diff --git a/libbeat/service/service.go b/libbeat/service/service.go index ea47fe90dc29..4c56cfc28a28 100644 --- a/libbeat/service/service.go +++ b/libbeat/service/service.go @@ -22,6 +22,7 @@ import ( "expvar" "flag" "fmt" + "net" "net/http" _ "net/http/pprof" "os" @@ -31,7 +32,6 @@ import ( "sync" "syscall" - "github.com/elastic/beats/v7/libbeat/api" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" ) @@ -119,36 +119,7 @@ func BeforeRun() { mux.HandleFunc("/debug/vars", metricsHandler) // Ensure we are listening before returning - listener, err := api.MakeListener(api.Config{Host: *httpprof}) - if err != nil { - logger.Errorf("Failed to start pprof listener: %v", err) - os.Exit(1) - } - - go func() { - // Serve returns always a non-nil error - err := http.Serve(listener, mux) - logger.Infof("Finished pprof endpoint: %v", err) - }() -} - -// BeforeRunWithPprof will start the http/pprof endpoints on the specified listener. -// The listener may be a host:socket, unix port, or Windows named pipe. -func BeforeRunWithPprof(listen string) { - logger := logp.NewLogger("service") - logger.Info("Start pprof endpoint") - mux := http.NewServeMux() - - // Register pprof handler - mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) { - http.DefaultServeMux.ServeHTTP(w, r) - }) - - // Register metrics handler - mux.HandleFunc("/debug/vars", metricsHandler) - - // Ensure we are listening before returning - listener, err := api.MakeListener(api.Config{Host: listen}) + listener, err := net.Listen("tcp", *httpprof) if err != nil { logger.Errorf("Failed to start pprof listener: %v", err) os.Exit(1) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index 2cde7076a5f6..56a41e068957 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -74,12 +74,7 @@ func run(streams *cli.IOStreams, override cfgOverrider) error { } defer locker.Unlock() - cfg, err := loadConfig(override) - if err != nil { - return err - } - - service.BeforeRunWithPprof(beats.AgentDebugEndpoint(cfg.Settings.DownloadConfig.OS())) + service.BeforeRun() defer service.Cleanup() // register as a service @@ -90,6 +85,11 @@ func run(streams *cli.IOStreams, override cfgOverrider) error { } service.HandleSignals(stopBeat, cancel) + cfg, err := loadConfig(override) + if err != nil { + return err + } + logger, err := logger.NewFromConfig("", cfg.Settings.LoggingConfig, true) if err != nil { return err diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 250a9911d4b3..98561774288a 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -243,7 +243,7 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr // retrieve elastic-agent pprof data if requested or application is unspecified. if req.AppName == "" || req.AppName == "elastic-agent" { - endpoint := beats.AgentDebugEndpoint(runtime.GOOS) + endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS) c := newSocketRequester(dur*2, "elastic-agent", "", endpoint) for _, opt := range req.PprofType { wg.Add(1) @@ -277,7 +277,7 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr if req.AppName != "" && req.AppName != n { continue // Skip the app if it does not match and one has been requested. } - endpoint := monitoring.DebugEndpoint(spec, runtime.GOOS, rk) + endpoint := monitoring.MonitoringEndpoint(spec, runtime.GOOS, rk) c := newSocketRequester(dur*2, n, rk, endpoint) // Launch a concurrent goroutine to gather all pprof endpoints from a socket. diff --git a/x-pack/elastic-agent/pkg/agent/program/spec.go b/x-pack/elastic-agent/pkg/agent/program/spec.go index 4745943a1d3d..34ce2fec279c 100644 --- a/x-pack/elastic-agent/pkg/agent/program/spec.go +++ b/x-pack/elastic-agent/pkg/agent/program/spec.go @@ -35,7 +35,6 @@ type Spec struct { ActionInputTypes []string `yaml:"action_input_types,omitempty"` LogPaths map[string]string `yaml:"log_paths,omitempty"` MetricEndpoints map[string]string `yaml:"metric_endpoints,omitempty"` - DebugEndpoints map[string]string `yaml:"debug_endpoints,omitempty"` Rules *transpiler.RuleList `yaml:"rules"` CheckInstallSteps *transpiler.StepList `yaml:"check_install"` PostInstallSteps *transpiler.StepList `yaml:"post_install"` diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go index 1ee1cef0d6a3..f96f5ecf9164 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go @@ -94,10 +94,6 @@ func (b *Monitor) generateMonitoringEndpoint(spec program.Spec, pipelineID strin return MonitoringEndpoint(spec, b.operatingSystem, pipelineID) } -func (b *Monitor) generateDebugEndpoint(spec program.Spec, pipelineID string) string { - return DebugEndpoint(spec, b.operatingSystem, pipelineID) -} - func (b *Monitor) generateLoggingFile(spec program.Spec, pipelineID string) string { return getLoggingFile(spec, b.operatingSystem, b.installPath, pipelineID) } @@ -129,11 +125,6 @@ func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string ) } - debugEndpoint := b.generateDebugEndpoint(spec, pipelineID) - appendix = append(appendix, - "-httpprof", debugEndpoint, - ) - loggingPath := b.generateLoggingPath(spec, pipelineID) if loggingPath != "" { logFile := spec.Cmd diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go index c91460d0d919..240ce5adbb22 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go @@ -25,8 +25,6 @@ const ( // args: pipeline name, application name agentMbEndpointFileFormatWin = `npipe:///elastic-agent` - // args: pipeline name, application name debug - agentDebugEndpointFileFormatWin = `npipe:///elastic-agent-debug` // agentMbEndpointHTTP is used with cloud and exposes metrics on http endpoint agentMbEndpointHTTP = "http://%s:%d" ) @@ -49,24 +47,6 @@ func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) s return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path))) } -// DebugEndpoint is an endpoint where process is exposing its pprof endpoints -func DebugEndpoint(spec program.Spec, operatingSystem, pipelineID string) string { - if endpoint, ok := spec.DebugEndpoints[operatingSystem]; ok { - return endpoint - } - if operatingSystem == "windows" { - return fmt.Sprintf(mbEndpointFileFormatWin, pipelineID, spec.Cmd+"-debug") - } - // unix socket path must be less than 104 characters - path := fmt.Sprintf("unix://%s-debug.sock", filepath.Join(paths.TempDir(), pipelineID, spec.Cmd, spec.Cmd)) - if len(path) < 104 { - return path - } - // place in global /tmp (or /var/tmp on Darwin) to ensure that its small enough to fit; current path is way to long - // for it to be used, but needs to be unique per Agent (in the case that multiple are running) - return fmt.Sprintf(`unix:///tmp/elastic-agent/%x-debug.sock`, sha256.Sum256([]byte(path))) -} - func getLoggingFile(spec program.Spec, operatingSystem, installPath, pipelineID string) string { if path, ok := spec.LogPaths[operatingSystem]; ok { return path @@ -96,21 +76,6 @@ func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.Monit return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path))) } -// AgentDebugEndpoint returns endpoint with exposed http/pprof endpoints for agent. -func AgentDebugEndpoint(operatingSystem string) string { - if operatingSystem == "windows" { - return agentDebugEndpointFileFormatWin - } - // unix socket path must be less than 104 characters - path := fmt.Sprintf("unix://%s-debug.sock", filepath.Join(paths.TempDir(), "elastic-agent")) - if len(path) < 104 { - return path - } - // place in global /tmp to ensure that its small enough to fit; current path is way to long - // for it to be used, but needs to be unique per Agent (in the case that multiple are running) - return fmt.Sprintf(`unix:///tmp/elastic-agent/%x-debug.sock`, sha256.Sum256([]byte(path))) -} - // AgentPrefixedMonitoringEndpoint returns endpoint with exposed metrics for agent. func AgentPrefixedMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.MonitoringHTTPConfig) string { return httpPlusPrefix + AgentMonitoringEndpoint(operatingSystem, cfg) diff --git a/x-pack/elastic-agent/pkg/core/monitoring/config/config.go b/x-pack/elastic-agent/pkg/core/monitoring/config/config.go index 10a3a6bd4a91..7e1dbc77273a 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/config/config.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/config/config.go @@ -15,7 +15,6 @@ type MonitoringConfig struct { LogMetrics bool `yaml:"-" config:"-"` HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"` Namespace string `yaml:"namespace" config:"namespace"` - Pprof bool `yaml:"pprof" config:"pprof"` } // MonitoringHTTPConfig is a config defining HTTP endpoint published by agent @@ -39,6 +38,5 @@ func DefaultConfig() *MonitoringConfig { Port: defaultPort, }, Namespace: defaultNamespace, - Pprof: true, } } From bbaf43a37592a50792725cb65685579cb7f0d5a5 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 17 Nov 2021 14:11:16 -0800 Subject: [PATCH 08/17] Cleanup timeout handling Change pprof timeouts from 2*pprofDur to 30s+pprofDur. Remove timeouts from the socket requester client as cancellations for long running requests will be handled by the passed ctx. --- .../pkg/agent/cmd/diagnostics.go | 5 ++-- .../pkg/agent/control/client/client.go | 2 +- .../pkg/agent/control/server/server.go | 25 ++++++++++--------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go index e7d8614b6bbd..9d50a1439c42 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go @@ -168,8 +168,9 @@ func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string ctx := handleSignal(context.Background()) d := time.Second * 30 + // Add more time to the ctx to allow profile/trace data to be gathered and processed if pprof { - d = 2 * pprofDur + d += pprofDur } innerCtx, cancel := context.WithTimeout(ctx, d) defer cancel() @@ -217,7 +218,7 @@ func diagnosticsPprofCmd(streams *cli.IOStreams, dur time.Duration, outFile, pTy } ctx := handleSignal(context.Background()) - innerCtx, cancel := context.WithTimeout(ctx, dur*2) + innerCtx, cancel := context.WithTimeout(ctx, dur+time.Second*30) defer cancel() daemon := client.New() diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index dbc586ea81ad..900718bc6a69 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -103,7 +103,7 @@ type Client interface { Upgrade(ctx context.Context, version string, sourceURI string) (string, error) // ProcMeta gathers running process meta-data. ProcMeta(ctx context.Context) ([]ProcMeta, error) - // Ppro + // Pprof gathers data from the /debug/pprof/ endpoints specified. Pprof(ctx context.Context, d time.Duration, pprofTypes []proto.PprofOption, appName, routeKey string) (map[string][]ProcPProf, error) } diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 98561774288a..8be5d222c64d 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -214,9 +214,9 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR for n, spec := range specs { endpoint := monitoring.MonitoringEndpoint(spec, runtime.GOOS, rk) - client := newSocketRequester(time.Second*5, n, rk, endpoint) + client := newSocketRequester(n, rk, endpoint) - procMeta := client.ProcMeta(ctx) + procMeta := client.procMeta(ctx) resp.Procs = append(resp.Procs, procMeta) } } @@ -244,11 +244,11 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr // retrieve elastic-agent pprof data if requested or application is unspecified. if req.AppName == "" || req.AppName == "elastic-agent" { endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS) - c := newSocketRequester(dur*2, "elastic-agent", "", endpoint) + c := newSocketRequester("elastic-agent", "", endpoint) for _, opt := range req.PprofType { wg.Add(1) go func(opt proto.PprofOption) { - res := c.GetPprof(ctx, opt, dur) + res := c.getPprof(ctx, opt, dur) ch <- res wg.Done() }(opt) @@ -274,8 +274,9 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr } specs := sp.Specs() for n, spec := range specs { + // Skip the app if it does not match and one has been requested. if req.AppName != "" && req.AppName != n { - continue // Skip the app if it does not match and one has been requested. + continue } endpoint := monitoring.MonitoringEndpoint(spec, runtime.GOOS, rk) c := newSocketRequester(dur*2, n, rk, endpoint) @@ -284,7 +285,7 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr for _, opt := range req.PprofType { wg.Add(1) go func(opt proto.PprofOption) { - res := c.GetPprof(ctx, opt, dur) + res := c.getPprof(ctx, opt, dur) ch <- res wg.Done() }(opt) @@ -313,8 +314,8 @@ type socketRequester struct { routeKey string } -func newSocketRequester(timeout time.Duration, appName, routeKey, endpoint string) *socketRequester { - c := http.Client{Timeout: timeout} +func newSocketRequester(appName, routeKey, endpoint string) *socketRequester { + c := http.Client{} if strings.HasPrefix(endpoint, "unix://") { c.Transport = &http.Transport{ Proxy: nil, @@ -356,8 +357,8 @@ func (r *socketRequester) getPath(ctx context.Context, path string) (*http.Respo } -// ProcMeta will return process metadata by querying the "/" path. -func (r *socketRequester) ProcMeta(ctx context.Context) *proto.ProcMeta { +// procMeta will return process metadata by querying the "/" path. +func (r *socketRequester) procMeta(ctx context.Context) *proto.ProcMeta { pm := &proto.ProcMeta{ Name: r.appName, RouteKey: r.routeKey, @@ -405,8 +406,8 @@ var pprofEndpoints = map[proto.PprofOption]string{ proto.PprofOption_TRACE: "/debug/pprof/trace", } -// GetProf will gather pprof data specified by the option. -func (r *socketRequester) GetPprof(ctx context.Context, opt proto.PprofOption, dur time.Duration) *proto.PprofResult { +// getProf will gather pprof data specified by the option. +func (r *socketRequester) getPprof(ctx context.Context, opt proto.PprofOption, dur time.Duration) *proto.PprofResult { res := &proto.PprofResult{ AppName: r.appName, RouteKey: r.routeKey, From c82f697bd70bbca4876dc462e7f95cea16a5f558 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 22 Nov 2021 19:40:07 -0800 Subject: [PATCH 09/17] Fix linting issue add timeout flag Fix linting issues with new command. Add a timeout flag for when pprof info is gathered. Flag will let users specify the command timeout value. This value whould be greater then the pprof-duration as it needs to gather and process pprof data. --- .../pkg/agent/cmd/diagnostics.go | 42 ++++++++++++++----- .../pkg/agent/control/server/server.go | 4 +- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go index 9d50a1439c42..a3addf3d169c 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go @@ -93,8 +93,14 @@ func newDiagnosticsCollectCommandWithArgs(_ []string, streams *cli.IOStreams) *c pprof, _ := c.Flags().GetBool("pprof") d, _ := c.Flags().GetDuration("pprof-duration") + // get the command timeout value only if one is set explicitly. + // otherwise a value of 30s + pprof-duration will be used. + var timeout time.Duration + if c.Flags().Changed("timeout") { + timeout, _ = c.Flags().GetDuration("timeout") + } - return diagnosticsCollectCmd(streams, file, output, pprof, d) + return diagnosticsCollectCmd(streams, file, output, pprof, d, timeout) }, } @@ -102,6 +108,7 @@ func newDiagnosticsCollectCommandWithArgs(_ []string, streams *cli.IOStreams) *c cmd.Flags().String("output", "yaml", "Output the collected information in either json, or yaml (default: yaml)") // replace output flag with different options cmd.Flags().Bool("pprof", false, "Collect all pprof data from all running applications.") cmd.Flags().Duration("pprof-duration", time.Second*30, "The duration to collect trace and profiling data from the debug/pprof endpoints. (default: 30s)") + cmd.Flags().Duration("timeout", time.Second*30, "The timeout for the diagnostics collect command, will be either 30s or 30s+pprof-duration by default. Should be longer then pprof-duration when pprof is enabled as the command needs time to process/archive the response.") return cmd } @@ -116,16 +123,24 @@ func newDiagnosticsPprofCommandWithArgs(_ []string, streams *cli.IOStreams) *cob file, _ := c.Flags().GetString("file") pprofType, _ := c.Flags().GetString("pprof-type") d, _ := c.Flags().GetDuration("pprof-duration") + // get the command timeout value only if one is set explicitly. + // otherwise a value of 30s + pprof-duration will be used. + var timeout time.Duration + if c.Flags().Changed("timeout") { + timeout, _ = c.Flags().GetDuration("timeout") + } + pprofApp, _ := c.Flags().GetString("pprof-application") pprofRK, _ := c.Flags().GetString("pprof-route-key") - return diagnosticsPprofCmd(streams, d, file, pprofType, pprofApp, pprofRK) + return diagnosticsPprofCmd(streams, d, timeout, file, pprofType, pprofApp, pprofRK) }, } cmd.Flags().StringP("file", "f", "", "name of the output file, stdout if unspecified.") cmd.Flags().String("pprof-type", "profile", "Collect all pprof data from all running applications. Select one of [allocs, block, cmdline, goroutine, heap, mutex, profile, threadcreate, trace]") cmd.Flags().Duration("pprof-duration", time.Second*30, "The duration to collect trace and profiling data from the debug/pprof endpoints. (default: 30s)") + cmd.Flags().Duration("timeout", time.Second*90, "The timeout for the pprof collect command, defaults to 30s+pprof-duration by default. Should be longer then pprof-duration as the command needs time to process the response.") cmd.Flags().String("pprof-application", "elastic-agent", "Application name to collect pprof data from.") cmd.Flags().String("pprof-route-key", "default", "Route key to collect pprof data from.") @@ -160,19 +175,22 @@ func diagnosticCmd(streams *cli.IOStreams, cmd *cobra.Command, args []string) er return outputFunc(streams.Out, diag) } -func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string, pprof bool, pprofDur time.Duration) error { +func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string, pprof bool, pprofDur, cmdTimeout time.Duration) error { err := tryContainerLoadPaths() if err != nil { return err } ctx := handleSignal(context.Background()) - d := time.Second * 30 - // Add more time to the ctx to allow profile/trace data to be gathered and processed - if pprof { - d += pprofDur + // set command timeout to 30s or 30s+pprofDur if no timeout is specified + if cmdTimeout == time.Duration(0) { + cmdTimeout = time.Second * 30 + if pprof { + cmdTimeout += pprofDur + } + } - innerCtx, cancel := context.WithTimeout(ctx, d) + innerCtx, cancel := context.WithTimeout(ctx, cmdTimeout) defer cancel() diag, err := getDiagnostics(innerCtx) @@ -206,7 +224,7 @@ func diagnosticsCollectCmd(streams *cli.IOStreams, fileName, outputFormat string return nil } -func diagnosticsPprofCmd(streams *cli.IOStreams, dur time.Duration, outFile, pType, appName, rk string) error { +func diagnosticsPprofCmd(streams *cli.IOStreams, dur, cmdTimeout time.Duration, outFile, pType, appName, rk string) error { pt, ok := proto.PprofOption_value[strings.ToUpper(pType)] if !ok { return fmt.Errorf("unknown pprof-type %q, select one of [allocs, block, cmdline, goroutine, heap, mutex, profile, threadcreate, trace]", pType) @@ -218,7 +236,11 @@ func diagnosticsPprofCmd(streams *cli.IOStreams, dur time.Duration, outFile, pTy } ctx := handleSignal(context.Background()) - innerCtx, cancel := context.WithTimeout(ctx, dur+time.Second*30) + // set cmdTimeout to 30s+dur if not set. + if cmdTimeout == time.Duration(0) { + cmdTimeout = time.Second*30 + dur + } + innerCtx, cancel := context.WithTimeout(ctx, cmdTimeout) defer cancel() daemon := client.New() diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 8be5d222c64d..daff2ada5787 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -243,7 +243,7 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr // retrieve elastic-agent pprof data if requested or application is unspecified. if req.AppName == "" || req.AppName == "elastic-agent" { - endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS) + endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, nil) //TODO get monitoring conf? c := newSocketRequester("elastic-agent", "", endpoint) for _, opt := range req.PprofType { wg.Add(1) @@ -279,7 +279,7 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr continue } endpoint := monitoring.MonitoringEndpoint(spec, runtime.GOOS, rk) - c := newSocketRequester(dur*2, n, rk, endpoint) + c := newSocketRequester(n, rk, endpoint) // Launch a concurrent goroutine to gather all pprof endpoints from a socket. for _, opt := range req.PprofType { From a9b36939f00975835fec2558eb576c9da3a3a227 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 22 Nov 2021 19:51:58 -0800 Subject: [PATCH 10/17] Add more command help text. --- x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go index a3addf3d169c..37e0ddee1149 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go @@ -117,7 +117,7 @@ func newDiagnosticsPprofCommandWithArgs(_ []string, streams *cli.IOStreams) *cob cmd := &cobra.Command{ Use: "pprof", Short: "Collect pprof information from a running process.", - Long: "Collect pprof information from the elastic-agent or one of its processes and write to stdout or a file.", + Long: "Collect pprof information from the elastic-agent or one of its processes and write to stdout or a file.\nBy default it will gather a 30s profile of the elastic-agent and output on stdout.", Args: cobra.MaximumNArgs(5), RunE: func(c *cobra.Command, args []string) error { file, _ := c.Flags().GetString("file") @@ -140,7 +140,7 @@ func newDiagnosticsPprofCommandWithArgs(_ []string, streams *cli.IOStreams) *cob cmd.Flags().StringP("file", "f", "", "name of the output file, stdout if unspecified.") cmd.Flags().String("pprof-type", "profile", "Collect all pprof data from all running applications. Select one of [allocs, block, cmdline, goroutine, heap, mutex, profile, threadcreate, trace]") cmd.Flags().Duration("pprof-duration", time.Second*30, "The duration to collect trace and profiling data from the debug/pprof endpoints. (default: 30s)") - cmd.Flags().Duration("timeout", time.Second*90, "The timeout for the pprof collect command, defaults to 30s+pprof-duration by default. Should be longer then pprof-duration as the command needs time to process the response.") + cmd.Flags().Duration("timeout", time.Second*60, "The timeout for the pprof collect command, defaults to 30s+pprof-duration by default. Should be longer then pprof-duration as the command needs time to process the response.") cmd.Flags().String("pprof-application", "elastic-agent", "Application name to collect pprof data from.") cmd.Flags().String("pprof-route-key", "default", "Route key to collect pprof data from.") @@ -282,8 +282,8 @@ func diagnosticsPprofCmd(streams *cli.IOStreams, dur, cmdTimeout time.Duration, fmt.Fprintf(streams.Out, "pprof data written to %s\n", outFile) return nil } - streams.Out.Write(res.Result) - return nil + _, err = streams.Out.Write(res.Result) + return err } func getDiagnostics(ctx context.Context) (DiagnosticsInfo, error) { From b4ffdf0e370018d756ebd47f814bf0f6e4d7ba80 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 22 Nov 2021 20:02:33 -0800 Subject: [PATCH 11/17] Add CHANGELOG --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index d0de29fdea8e..4e452dfbb798 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -150,3 +150,4 @@ - Add diagnostics collect command to gather beat metadata, config, policy, and logs and bundle it into an archive. {pull}28461[28461] - Add `KIBANA_FLEET_SERVICE_TOKEN` to Elastic Agent container. {pull}28096[28096] - Enable pprof endpoints for beats processes. Allow pprof endpoints for elastic-agent if enabled. {pull}28983[28983] +- Add `--pprof` flag to `elastic-agent diagnostics` and an `elastic-agent pprof` command to allow operators to gather pprof data from the agent and beats running under it. {pull}28798[28798] From 8f145a6d7adc2dc78d7a826e77b1aa2b1768bcf0 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 24 Nov 2021 19:35:29 -0800 Subject: [PATCH 12/17] move spec collection for routes to fn --- .../pkg/agent/control/server/server.go | 137 +++++++++++------- 1 file changed, 81 insertions(+), 56 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index daff2ada5787..373fb78c4874 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -50,6 +50,12 @@ type specer interface { Specs() map[string]program.Spec } +type specInfo struct { + spec program.Spec + app string + rk string +} + // New creates a new control protocol server. func New(log *logger.Logger, rex reexec.ExecManager, statusCtrl status.Controller, up *upgrade.Upgrader) *Server { return &Server{ @@ -197,29 +203,16 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR Procs: []*proto.ProcMeta{}, } - routes := s.routeFn() - for _, rk := range routes.Keys() { - programs, ok := routes.Get(rk) - if !ok { - s.logger.With("route_key", rk).Warn("Unable to retrieve route.") - continue - } + // gather spec data for all rk/apps running + specs := s.getSpecInfo("", "") + for _, si := range specs { + endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk) + client := newSocketRequester(si.app, si.rk, endpoint) - sp, ok := programs.(specer) - if !ok { - s.logger.With("route_key", rk, "route", programs).Warn("Unable to cast route as specer.") - continue - } - specs := sp.Specs() - - for n, spec := range specs { - endpoint := monitoring.MonitoringEndpoint(spec, runtime.GOOS, rk) - client := newSocketRequester(n, rk, endpoint) - - procMeta := client.procMeta(ctx) - resp.Procs = append(resp.Procs, procMeta) - } + procMeta := client.procMeta(ctx) + resp.Procs = append(resp.Procs, procMeta) } + return resp, nil } @@ -255,41 +248,19 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr } } - routes := s.routeFn() - for _, rk := range routes.Keys() { - // Skip the rk if it does not match and one has been requested. - if req.RouteKey != "" && req.RouteKey != rk { - continue - } - programs, ok := routes.Get(rk) - if !ok { - s.logger.With("route_key", rk).Warn("Unable to retrieve route.") - continue - } - - sp, ok := programs.(specer) - if !ok { - s.logger.With("route_key", rk, "route", programs).Warn("Unable to cast route as specer.") - continue - } - specs := sp.Specs() - for n, spec := range specs { - // Skip the app if it does not match and one has been requested. - if req.AppName != "" && req.AppName != n { - continue - } - endpoint := monitoring.MonitoringEndpoint(spec, runtime.GOOS, rk) - c := newSocketRequester(n, rk, endpoint) - - // Launch a concurrent goroutine to gather all pprof endpoints from a socket. - for _, opt := range req.PprofType { - wg.Add(1) - go func(opt proto.PprofOption) { - res := c.getPprof(ctx, opt, dur) - ch <- res - wg.Done() - }(opt) - } + // get requested rk/appname spec or all specs + specs := s.getSpecInfo(req.RouteKey, req.AppName) + for _, si := range specs { + endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk) + c := newSocketRequester(si.app, si.rk, endpoint) + // Launch a concurrent goroutine to gather all pprof endpoints from a socket. + for _, opt := range req.PprofType { + wg.Add(1) + go func(opt proto.PprofOption) { + res := c.getPprof(ctx, opt, dur) + ch <- res + wg.Done() + }(opt) } } @@ -306,6 +277,60 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr return resp, nil } +// getSpecs will return the specs for the program associated with the specified route key/app name, or all programs if no key(s) are specified. +// if matchRK or matchApp are empty all results will be returned. +func (s *Server) getSpecInfo(matchRK, matchApp string) []specInfo { + routes := s.routeFn() + + // find specInfo for a specified rk/app + if matchRK != "" && matchApp != "" { + programs, ok := routes.Get(matchRK) + if !ok { + s.logger.With("route_key", matchRK).Debug("No matching route key found.") + return []specInfo{} + } + sp, ok := programs.(specer) + if !ok { + s.logger.With("route_key", matchRK, "route", programs).Warn("Unable to cast route as specer.") + return []specInfo{} + } + specs := sp.Specs() + + spec, ok := specs[matchApp] + if !ok { + s.logger.With("route_key", matchRK, "application_name", matchApp).Debug("No matching route key/application name found.") + return []specInfo{} + } + return []specInfo{specInfo{spec: spec, app: matchApp, rk: matchRK}} + } + + // gather specInfo for all rk/app values + res := make([]specInfo, 0) + for _, rk := range routes.Keys() { + programs, ok := routes.Get(rk) + if !ok { + // we do not expect to ever hit this code path + // if this log message occurs then the agent is unable to access one of the keys that is returned by the route function + // might be a race condition if someone tries to update the policy to remove an output? + s.logger.With("route_key", rk).Warn("Unable to retrieve route.") + continue + } + sp, ok := programs.(specer) + if !ok { + s.logger.With("route_key", matchRK, "route", programs).Warn("Unable to cast route as specer.") + continue + } + for n, spec := range sp.Specs() { + res = append(res, specInfo{ + rk: rk, + app: n, + spec: spec, + }) + } + } + return res +} + // socketRequester is a struct to gather (diagnostics) data from a socket opened by elastic-agent or one if it's processes type socketRequester struct { c http.Client From 1b2a9ff268e51085872dbd680a0d533a80dec701 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 24 Nov 2021 19:56:22 -0800 Subject: [PATCH 13/17] add monitoringCfg reference to control server --- x-pack/elastic-agent/pkg/agent/cmd/run.go | 1 + .../pkg/agent/control/server/server.go | 31 +++++++++++++------ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index 1c8c1dd4916f..a3f83849f213 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -148,6 +148,7 @@ func run(streams *cli.IOStreams, override cfgOverrider) error { } control.SetRouteFn(app.Routes) + control.SetMonitoringCfg(cfg.Settings.MonitoringConfig) serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app) if err != nil { diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 373fb78c4874..57223cd0b1db 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats" monitoring "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats" + monitoringCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/socket" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" @@ -36,14 +37,15 @@ import ( // Server is the daemon side of the control protocol. type Server struct { - logger *logger.Logger - rex reexec.ExecManager - statusCtrl status.Controller - up *upgrade.Upgrader - routeFn func() *sorted.Set - listener net.Listener - server *grpc.Server - lock sync.RWMutex + logger *logger.Logger + rex reexec.ExecManager + statusCtrl status.Controller + up *upgrade.Upgrader + routeFn func() *sorted.Set + monitoringCfg *monitoringCfg.MonitoringConfig + listener net.Listener + server *grpc.Server + lock sync.RWMutex } type specer interface { @@ -80,6 +82,14 @@ func (s *Server) SetRouteFn(routesFetchFn func() *sorted.Set) { s.routeFn = routesFetchFn } +// SetMonitoringCfg sets a reference to the monitoring config used by the running agent. +// the controller references this config to find out if pprof is enabled for the agent or not +func (s *Server) SetMonitoringCfg(cfg *monitoringCfg.MonitoringConfig) { + s.lock.Lock() + defer s.lock.Unlock() + s.monitoringCfg = cfg +} + // Start starts the GRPC endpoint and accepts new connections. func (s *Server) Start() error { if s.server != nil { @@ -236,7 +246,10 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr // retrieve elastic-agent pprof data if requested or application is unspecified. if req.AppName == "" || req.AppName == "elastic-agent" { - endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, nil) //TODO get monitoring conf? + if !s.monitoringCfg.Pprof { + return nil, fmt.Errorf("elastic-agent pprof disabled") + } + endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP) c := newSocketRequester("elastic-agent", "", endpoint) for _, opt := range req.PprofType { wg.Add(1) From e6707c47f0d1ad354d88c99f6c98529e40b0f469 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Thu, 2 Dec 2021 11:33:19 -0800 Subject: [PATCH 14/17] elastic-agent server only processes pprof requests when enabled --- x-pack/elastic-agent/pkg/agent/control/server/server.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 57223cd0b1db..d0e36cf824d0 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -228,6 +228,10 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR // Pprof returns /debug/pprof data for the requested applicaiont-route_key or all running applications. func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.PprofResponse, error) { + if !s.monitoringCfg.Pprof { + return nil, fmt.Errorf("agent.monitoring.pprof disabled") + } + if s.routeFn == nil { return nil, errors.New("route function is nil") } @@ -246,9 +250,6 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr // retrieve elastic-agent pprof data if requested or application is unspecified. if req.AppName == "" || req.AppName == "elastic-agent" { - if !s.monitoringCfg.Pprof { - return nil, fmt.Errorf("elastic-agent pprof disabled") - } endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP) c := newSocketRequester("elastic-agent", "", endpoint) for _, opt := range req.PprofType { From 1bdc078166efd4e3e80f9beb00ff8ada81489cf7 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Thu, 2 Dec 2021 15:48:03 -0800 Subject: [PATCH 15/17] Fix error message fix commands only on elastic-agent --- x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go | 2 +- x-pack/elastic-agent/pkg/agent/control/server/server.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go index 37e0ddee1149..b32edf6df2da 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/diagnostics.go @@ -260,7 +260,7 @@ func diagnosticsPprofCmd(streams *cli.IOStreams, dur, cmdTimeout time.Duration, return fmt.Errorf("route key %q not found in response data (map length: %d)", rk, len(pprofData)) } if len(pArr) != 1 { - return fmt.Errorf("route key application length 1 expected, recieved %d", len(pArr)) + return fmt.Errorf("pprof type length 1 expected, recieved %d", len(pArr)) } res := pArr[0] diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index d0e36cf824d0..df35dc8d6ad3 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -263,7 +263,10 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr } // get requested rk/appname spec or all specs - specs := s.getSpecInfo(req.RouteKey, req.AppName) + var specs []specInfo + if req.AppName != "elastic-agent" { + specs = s.getSpecInfo(req.RouteKey, req.AppName) + } for _, si := range specs { endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk) c := newSocketRequester(si.app, si.rk, endpoint) From 91140b72c93de1822002d4ff3f75a13adacdb009 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 6 Dec 2021 11:58:31 -0800 Subject: [PATCH 16/17] Add pprof fleet.yml, fix nil reference --- .../actions/handlers/handler_action_policy_change.go | 7 ++++--- x-pack/elastic-agent/pkg/agent/control/server/server.go | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/application/pipeline/actions/handlers/handler_action_policy_change.go b/x-pack/elastic-agent/pkg/agent/application/pipeline/actions/handlers/handler_action_policy_change.go index f14ec7aea81f..e00ccfc844ba 100644 --- a/x-pack/elastic-agent/pkg/agent/application/pipeline/actions/handlers/handler_action_policy_change.go +++ b/x-pack/elastic-agent/pkg/agent/application/pipeline/actions/handlers/handler_action_policy_change.go @@ -197,9 +197,10 @@ func fleetToReader(agentInfo *info.AgentInfo, cfg *configuration.Configuration) configToStore := map[string]interface{}{ "fleet": cfg.Fleet, "agent": map[string]interface{}{ - "id": agentInfo.AgentID(), - "logging.level": cfg.Settings.LoggingConfig.Level, - "monitoring.http": cfg.Settings.MonitoringConfig.HTTP, + "id": agentInfo.AgentID(), + "logging.level": cfg.Settings.LoggingConfig.Level, + "monitoring.http": cfg.Settings.MonitoringConfig.HTTP, + "monitoring.pprof": cfg.Settings.MonitoringConfig.Pprof, }, } diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index df35dc8d6ad3..e7117a488fc9 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -228,7 +228,7 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR // Pprof returns /debug/pprof data for the requested applicaiont-route_key or all running applications. func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.PprofResponse, error) { - if !s.monitoringCfg.Pprof { + if s.monitoringCfg == nil || !s.monitoringCfg.Pprof { return nil, fmt.Errorf("agent.monitoring.pprof disabled") } From 4e6ecbb9321e0253b291e02bbfcd80f2474af0c7 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 13 Dec 2021 20:53:46 -0800 Subject: [PATCH 17/17] Change pprof setting name to monitoring.pprof.enabled Chagne the setting in elastic agent from agent.monioring.pprof to agent.monitoring.pprof.enabled so that policy updates (such as the one that occurs when the agent is starting in fleet mode) do not use the default false value if the user has injected the ssetting into fleet.yml --- x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl | 2 +- .../_meta/config/common.reference.p2.yml.tmpl | 2 +- .../_meta/config/elastic-agent.docker.yml.tmpl | 2 +- x-pack/elastic-agent/elastic-agent.docker.yml | 2 +- x-pack/elastic-agent/elastic-agent.reference.yml | 2 +- x-pack/elastic-agent/elastic-agent.yml | 2 +- x-pack/elastic-agent/pkg/agent/cmd/run.go | 2 +- .../elastic-agent/pkg/agent/control/server/server.go | 2 +- .../pkg/core/monitoring/beats/beats_monitor.go | 6 +++++- .../elastic-agent/pkg/core/monitoring/config/config.go | 10 ++++++++-- 10 files changed, 21 insertions(+), 11 deletions(-) diff --git a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl index de16df8ea7f7..e8f4c31e8e18 100644 --- a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl @@ -35,7 +35,7 @@ inputs: # metrics: true # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl index 43e484646306..8a3ef0773578 100644 --- a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl @@ -109,7 +109,7 @@ inputs: # metrics: false # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl b/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl index 69a80678db87..17201aa6dcea 100644 --- a/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl @@ -109,7 +109,7 @@ inputs: # metrics: false # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/elastic-agent.docker.yml b/x-pack/elastic-agent/elastic-agent.docker.yml index 9bf7307aacfd..b7d5ff2017ea 100644 --- a/x-pack/elastic-agent/elastic-agent.docker.yml +++ b/x-pack/elastic-agent/elastic-agent.docker.yml @@ -109,7 +109,7 @@ inputs: # metrics: false # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/elastic-agent.reference.yml b/x-pack/elastic-agent/elastic-agent.reference.yml index 67922a1d89cc..da04df95ea81 100644 --- a/x-pack/elastic-agent/elastic-agent.reference.yml +++ b/x-pack/elastic-agent/elastic-agent.reference.yml @@ -115,7 +115,7 @@ inputs: # metrics: false # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/elastic-agent.yml b/x-pack/elastic-agent/elastic-agent.yml index d40b6518e8d5..802df992ba7c 100644 --- a/x-pack/elastic-agent/elastic-agent.yml +++ b/x-pack/elastic-agent/elastic-agent.yml @@ -41,7 +41,7 @@ inputs: # metrics: true # # exposes /debug/pprof/ endpoints # # recommended that these endpoints are only enabled if the monitoring endpoint is set to localhost -# pprof: false +# pprof.enabled: false # # exposes agent metrics using http, by default sockets and named pipes are used # http: # # enables http endpoint diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index a3f83849f213..4ea0a7bcac71 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -314,7 +314,7 @@ func setupMetrics(agentInfo *info.AgentInfo, logger *logger.Logger, operatingSys } s.Start() - if cfg.Pprof { + if cfg.Pprof != nil && cfg.Pprof.Enabled { s.AttachPprof() } diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index e7117a488fc9..12ed4650eedf 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -228,7 +228,7 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR // Pprof returns /debug/pprof data for the requested applicaiont-route_key or all running applications. func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.PprofResponse, error) { - if s.monitoringCfg == nil || !s.monitoringCfg.Pprof { + if s.monitoringCfg == nil || s.monitoringCfg.Pprof == nil || !s.monitoringCfg.Pprof.Enabled { return nil, fmt.Errorf("agent.monitoring.pprof disabled") } diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go index cb0519f98063..939aa89c99d5 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go @@ -35,6 +35,7 @@ type Monitor struct { func NewMonitor(downloadConfig *artifact.Config, monitoringCfg *monitoringConfig.MonitoringConfig, logMetrics bool) *Monitor { if monitoringCfg == nil { monitoringCfg = monitoringConfig.DefaultConfig() + monitoringCfg.Pprof = &monitoringConfig.PprofConfig{Enabled: false} } monitoringCfg.LogMetrics = logMetrics @@ -55,6 +56,9 @@ func (b *Monitor) Reload(rawConfig *config.Config) error { if cfg == nil || cfg.Settings == nil || cfg.Settings.MonitoringConfig == nil { b.config = monitoringConfig.DefaultConfig() } else { + if cfg.Settings.MonitoringConfig.Pprof == nil { + cfg.Settings.MonitoringConfig.Pprof = b.config.Pprof + } b.config = cfg.Settings.MonitoringConfig logMetrics := true if cfg.Settings.LoggingConfig != nil { @@ -123,7 +127,7 @@ func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string "-E", "http.enabled=true", "-E", "http.host="+endpoint, ) - if b.config.Pprof { + if b.config.Pprof != nil && b.config.Pprof.Enabled { appendix = append(appendix, "-E", "http.pprof.enabled=true", ) diff --git a/x-pack/elastic-agent/pkg/core/monitoring/config/config.go b/x-pack/elastic-agent/pkg/core/monitoring/config/config.go index 10f220fcc5af..3004561bd862 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/config/config.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/config/config.go @@ -15,7 +15,7 @@ type MonitoringConfig struct { LogMetrics bool `yaml:"-" config:"-"` HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"` Namespace string `yaml:"namespace" config:"namespace"` - Pprof bool `yaml:"pprof" config:"pprof"` + Pprof *PprofConfig `yaml:"pprof" config:"pprof"` } // MonitoringHTTPConfig is a config defining HTTP endpoint published by agent @@ -27,6 +27,13 @@ type MonitoringHTTPConfig struct { Port int `yaml:"port" config:"port" validate:"min=0,max=65535,nonzero"` } +// PprofConfig is a struct for the pprof enablement flag. +// It is a nil struct by default to allow the agent to use the a value that the user has injected into fleet.yml as the source of truth that is passed to beats +// TODO get this value from Kibana? +type PprofConfig struct { + Enabled bool `yaml:"enabled" config:"enabled"` +} + // DefaultConfig creates a config with pre-set default values. func DefaultConfig() *MonitoringConfig { return &MonitoringConfig{ @@ -39,6 +46,5 @@ func DefaultConfig() *MonitoringConfig { Port: defaultPort, }, Namespace: defaultNamespace, - Pprof: false, } }