diff --git a/NOTICE.txt b/NOTICE.txt index db86aec50f91..96cd4f823748 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -10242,6 +10242,43 @@ Contents of probable licence file $GOMODCACHE/github.com/gorhill/cronexpr@v0.0.0 limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/gorilla/mux +Version: v1.7.2 +Licence type (autodetected): BSD-3-Clause +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/gorilla/mux@v1.7.2/LICENSE: + +Copyright (c) 2012-2018 The Gorilla Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + -------------------------------------------------------------------------------- Dependency : github.com/h2non/filetype Version: v1.1.1-0.20201130172452-f60988ab73d5 @@ -31247,43 +31284,6 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. --------------------------------------------------------------------------------- -Dependency : github.com/gorilla/mux -Version: v1.7.2 -Licence type (autodetected): BSD-3-Clause --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/gorilla/mux@v1.7.2/LICENSE: - -Copyright (c) 2012-2018 The Gorilla Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -------------------------------------------------------------------------------- Dependency : github.com/gorilla/websocket Version: v1.4.1 diff --git a/go.mod b/go.mod index 15bb085efab9..3763a3577985 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/google/gopacket v1.1.18-0.20191009163724-0ad7f2610e34 github.com/google/uuid v1.1.2 github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 - github.com/gorilla/mux v1.7.2 // indirect + github.com/gorilla/mux v1.7.2 github.com/grpc-ecosystem/grpc-gateway v1.13.0 // indirect github.com/h2non/filetype v1.1.1-0.20201130172452-f60988ab73d5 github.com/hashicorp/go-multierror v1.1.0 diff --git a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl index d73ad623a26a..b706c5ba8019 100644 --- a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl @@ -33,6 +33,15 @@ inputs: # logs: true # # enables metrics monitoring # metrics: true +# # exposes agent metrics using http, by default sockets and named pipes are used +# http: +# # enables http endpoint +# enabled: false +# # The HTTP endpoint will bind to this hostname, IP address, unix socket or named pipe. +# # When using IP addresses, it is recommended to only use localhost. +# host: localhost +# # Port on which the HTTP endpoint will bind. Default is 0 meaning feature is disabled. +# port: 0 # # Allow fleet to reload its configuration locally on disk. # # Notes: Only specific process configuration will be reloaded. diff --git a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl index 055f6444f166..07ff537ee7fd 100644 --- a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl @@ -107,6 +107,15 @@ inputs: # logs: false # # enables metrics monitoring # metrics: false +# # exposes agent metrics using http, by default sockets and named pipes are used +# http: +# # enables http endpoint +# enabled: false +# # The HTTP endpoint will bind to this hostname, IP address, unix socket or named pipe. +# # When using IP addresses, it is recommended to only use localhost. +# host: localhost +# # Port on which the HTTP endpoint will bind. Default is 0 meaning feature is disabled. +# port: 0 # # Allow fleet to reload its configuration locally on disk. # # Notes: Only specific process configuration will be reloaded. diff --git a/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl b/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl index 1e4c71788473..89709c810c66 100644 --- a/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl @@ -107,6 +107,15 @@ inputs: # logs: false # # enables metrics monitoring # metrics: false +# # exposes agent metrics using http, by default sockets and named pipes are used +# http: +# # enables http endpoint +# enabled: false +# # The HTTP endpoint will bind to this hostname, IP address, unix socket or named pipe. +# # When using IP addresses, it is recommended to only use localhost. +# host: localhost +# # Port on which the HTTP endpoint will bind. Default is 0 meaning feature is disabled. +# port: 0 # # Allow fleet to reload its configuration locally on disk. # # Notes: Only specific process configuration will be reloaded. diff --git a/x-pack/elastic-agent/_meta/elastic-agent.yml b/x-pack/elastic-agent/_meta/elastic-agent.yml index 50d435c4a5cf..09533381ff63 100644 --- a/x-pack/elastic-agent/_meta/elastic-agent.yml +++ b/x-pack/elastic-agent/_meta/elastic-agent.yml @@ -102,6 +102,15 @@ inputs: # logs: false # # enables metrics monitoring # metrics: false +# # exposes agent metrics using http, by default sockets and named pipes are used +# http: +# # enables http endpoint +# enabled: false +# # The HTTP endpoint will bind to this hostname, IP address, unix socket or named pipe. +# # When using IP addresses, it is recommended to only use localhost. +# host: localhost +# # Port on which the HTTP endpoint will bind. Default is 0 meaning feature is disabled. +# port: 0 # # Allow fleet to reload his configuration locally on disk. # # Notes: Only specific process configuration will be reloaded. diff --git a/x-pack/elastic-agent/elastic-agent.docker.yml b/x-pack/elastic-agent/elastic-agent.docker.yml index f20bb9a0ad97..c65c2b07219d 100644 --- a/x-pack/elastic-agent/elastic-agent.docker.yml +++ b/x-pack/elastic-agent/elastic-agent.docker.yml @@ -107,6 +107,15 @@ inputs: # logs: false # # enables metrics monitoring # metrics: false +# # exposes agent metrics using http, by default sockets and named pipes are used +# http: +# # enables http endpoint +# enabled: false +# # The HTTP endpoint will bind to this hostname, IP address, unix socket or named pipe. +# # When using IP addresses, it is recommended to only use localhost. +# host: localhost +# # Port on which the HTTP endpoint will bind. Default is 0 meaning feature is disabled. +# port: 0 # # Allow fleet to reload its configuration locally on disk. # # Notes: Only specific process configuration will be reloaded. diff --git a/x-pack/elastic-agent/elastic-agent.reference.yml b/x-pack/elastic-agent/elastic-agent.reference.yml index 28c9dc8b83dd..023e3ef8aeef 100644 --- a/x-pack/elastic-agent/elastic-agent.reference.yml +++ b/x-pack/elastic-agent/elastic-agent.reference.yml @@ -113,6 +113,15 @@ inputs: # logs: false # # enables metrics monitoring # metrics: false +# # exposes agent metrics using http, by default sockets and named pipes are used +# http: +# # enables http endpoint +# enabled: false +# # The HTTP endpoint will bind to this hostname, IP address, unix socket or named pipe. +# # When using IP addresses, it is recommended to only use localhost. +# host: localhost +# # Port on which the HTTP endpoint will bind. Default is 0 meaning feature is disabled. +# port: 0 # # Allow fleet to reload its configuration locally on disk. # # Notes: Only specific process configuration will be reloaded. diff --git a/x-pack/elastic-agent/elastic-agent.yml b/x-pack/elastic-agent/elastic-agent.yml index cd19a339b3c7..db9568fded4d 100644 --- a/x-pack/elastic-agent/elastic-agent.yml +++ b/x-pack/elastic-agent/elastic-agent.yml @@ -39,6 +39,15 @@ inputs: # logs: true # # enables metrics monitoring # metrics: true +# # exposes agent metrics using http, by default sockets and named pipes are used +# http: +# # enables http endpoint +# enabled: false +# # The HTTP endpoint will bind to this hostname, IP address, unix socket or named pipe. +# # When using IP addresses, it is recommended to only use localhost. +# host: localhost +# # Port on which the HTTP endpoint will bind. Default is 0 meaning feature is disabled. +# port: 0 # # Allow fleet to reload its configuration locally on disk. # # Notes: Only specific process configuration will be reloaded. diff --git a/x-pack/elastic-agent/pkg/agent/application/application.go b/x-pack/elastic-agent/pkg/agent/application/application.go index 96dcac99dff5..53c74afabe50 100644 --- a/x-pack/elastic-agent/pkg/agent/application/application.go +++ b/x-pack/elastic-agent/pkg/agent/application/application.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" @@ -26,6 +27,7 @@ type Application interface { Start() error Stop() error AgentInfo() *info.AgentInfo + Routes() *sorted.Set } type reexecManager interface { diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go b/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go index 6f3dd09335b0..60be85147cfe 100644 --- a/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go @@ -9,6 +9,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted" "github.com/elastic/go-sysinfo" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters" @@ -113,6 +114,11 @@ func newFleetServerBootstrap( return bootstrapApp, nil } +// Routes returns a list of routes handled by server. +func (b *FleetServerBootstrap) Routes() *sorted.Set { + return b.router.Routes() +} + // Start starts a managed elastic-agent. func (b *FleetServerBootstrap) Start() error { b.log.Info("Agent is starting") diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index ffb59f281ff6..4da6304e1ccf 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -30,6 +30,7 @@ import ( acker "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi/acker/noop" reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter" logreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/log" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted" ) type discoverFunc func() ([]string, error) @@ -157,6 +158,11 @@ func newLocal( return localApplication, nil } +// Routes returns a list of routes handled by agent. +func (l *Local) Routes() *sorted.Set { + return l.router.Routes() +} + // Start starts a local agent. func (l *Local) Start() error { l.log.Info("Agent is starting") diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index 94a8a826de49..e85034de4d43 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -43,6 +43,7 @@ import ( reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter" fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet" logreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/log" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted" ) type stateStore interface { @@ -278,6 +279,11 @@ func newManaged( return managedApplication, nil } +// Routes returns a list of routes handled by agent. +func (m *Managed) Routes() *sorted.Set { + return m.router.Routes() +} + // Start starts a managed elastic-agent. func (m *Managed) Start() error { m.log.Info("Agent is starting") diff --git a/x-pack/elastic-agent/pkg/agent/application/pipeline/pipeline.go b/x-pack/elastic-agent/pkg/agent/application/pipeline/pipeline.go index 8286c1ee3a45..c8cac5b32162 100644 --- a/x-pack/elastic-agent/pkg/agent/application/pipeline/pipeline.go +++ b/x-pack/elastic-agent/pkg/agent/application/pipeline/pipeline.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted" ) // ConfigHandler is capable of handling configrequest. @@ -30,6 +31,7 @@ type RoutingKey = string // Router is an interace routes programs to correspongind stream type Router interface { + Routes() *sorted.Set Route(id string, grpProg map[RoutingKey][]program.Program) error Shutdown() } diff --git a/x-pack/elastic-agent/pkg/agent/application/pipeline/router/router.go b/x-pack/elastic-agent/pkg/agent/application/pipeline/router/router.go index 6c7a27a2bd99..bda4a7b7cde7 100644 --- a/x-pack/elastic-agent/pkg/agent/application/pipeline/router/router.go +++ b/x-pack/elastic-agent/pkg/agent/application/pipeline/router/router.go @@ -34,6 +34,10 @@ func New(log *logger.Logger, factory pipeline.StreamFunc) (pipeline.Router, erro return &router{log: log, streamFactory: factory, routes: sorted.NewSet()}, nil } +func (r *router) Routes() *sorted.Set { + return r.routes +} + func (r *router) Route(id string, grpProg map[pipeline.RoutingKey][]program.Program) error { s := sorted.NewSet() diff --git a/x-pack/elastic-agent/pkg/agent/application/pipeline/stream/operator_stream.go b/x-pack/elastic-agent/pkg/agent/application/pipeline/stream/operator_stream.go index 519a7b6bb523..57b16bbabc4e 100644 --- a/x-pack/elastic-agent/pkg/agent/application/pipeline/stream/operator_stream.go +++ b/x-pack/elastic-agent/pkg/agent/application/pipeline/stream/operator_stream.go @@ -8,6 +8,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/pipeline" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" ) type operatorStream struct { @@ -15,10 +16,22 @@ type operatorStream struct { log *logger.Logger } +type stater interface { + State() map[string]state.State +} + func (b *operatorStream) Close() error { return b.configHandler.Close() } +func (b *operatorStream) State() map[string]state.State { + if s, ok := b.configHandler.(stater); ok { + return s.State() + } + + return nil +} + func (b *operatorStream) Execute(cfg configrequest.Request) error { return b.configHandler.HandleConfig(cfg) } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/inspect.go b/x-pack/elastic-agent/pkg/agent/cmd/inspect.go index f13163f27b29..294a8bda284f 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/inspect.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/inspect.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/noop" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted" "github.com/elastic/go-sysinfo" ) @@ -299,6 +300,10 @@ type inmemRouter struct { programs map[string][]program.Program } +func (r *inmemRouter) Routes() *sorted.Set { + return nil +} + func (r *inmemRouter) Route(id string, grpProg map[pipeline.RoutingKey][]program.Program) error { r.programs = grpProg return nil diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index 75d9e49fad85..2c0bb76cdb0b 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -6,14 +6,10 @@ package cmd import ( "context" - "encoding/json" "fmt" - "net/http" "os" "os/signal" "path/filepath" - "runtime" - "strings" "syscall" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" @@ -22,7 +18,6 @@ import ( "github.com/elastic/beats/v7/libbeat/api" "github.com/elastic/beats/v7/libbeat/cmd/instance/metrics" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/service" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application" @@ -39,6 +34,8 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats" + monitoringCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/config" + monitoringServer "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) @@ -157,7 +154,7 @@ func run(streams *cli.IOStreams, override cfgOverrider) error { // Windows: Mark return err } - serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS()) + serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app) if err != nil { return err } @@ -265,7 +262,7 @@ func defaultLogLevel(cfg *configuration.Configuration) string { return defaultLogLevel } -func setupMetrics(agentInfo *info.AgentInfo, logger *logger.Logger, operatingSystem string) (func() error, error) { +func setupMetrics(agentInfo *info.AgentInfo, logger *logger.Logger, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, app application.Application) (func() error, error) { // use libbeat to setup metrics if err := metrics.SetupMetrics(agentName); err != nil { return nil, err @@ -274,18 +271,10 @@ func setupMetrics(agentInfo *info.AgentInfo, logger *logger.Logger, operatingSys // start server for stats endpointConfig := api.Config{ Enabled: true, - Host: beats.AgentMonitoringEndpoint(operatingSystem), + Host: beats.AgentMonitoringEndpoint(operatingSystem, cfg.HTTP), } - // create agent config path - createAgentMonitoringDrop(endpointConfig.Host) - - cfg, err := common.NewConfigFrom(endpointConfig) - if err != nil { - return nil, err - } - - s, err := exposeMetricsEndpoint(logger, cfg, monitoring.GetNamespace) + s, err := monitoringServer.New(logger, endpointConfig, monitoring.GetNamespace, app.Routes, isProcessStatsEnabled(cfg.HTTP)) if err != nil { return nil, errors.New(err, "could not start the HTTP server for the API") } @@ -295,55 +284,6 @@ func setupMetrics(agentInfo *info.AgentInfo, logger *logger.Logger, operatingSys return s.Stop, nil } -func createAgentMonitoringDrop(drop string) error { - if drop == "" || runtime.GOOS == "windows" { - return nil - } - - path := strings.TrimPrefix(drop, "unix://") - if strings.HasSuffix(path, ".sock") { - path = filepath.Dir(path) - } - - _, err := os.Stat(path) - if err != nil { - if !os.IsNotExist(err) { - return err - } - - // create - if err := os.MkdirAll(path, 0775); err != nil { - return err - } - } - - return os.Chown(path, os.Geteuid(), os.Getegid()) -} - -func exposeMetricsEndpoint(log *logger.Logger, config *common.Config, ns func(string) *monitoring.Namespace) (*api.Server, error) { - mux := http.NewServeMux() - - makeAPIHandler := func(ns *monitoring.Namespace) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json; charset=utf-8") - - data := monitoring.CollectStructSnapshot( - ns.GetRegistry(), - monitoring.Full, - false, - ) - - bytes, err := json.Marshal(data) - var content string - if err != nil { - content = fmt.Sprintf("Not valid json: %v", err) - } else { - content = string(bytes) - } - fmt.Fprint(w, content) - } - } - - mux.HandleFunc("/stats", makeAPIHandler(ns("stats"))) - return api.New(log, mux, config) +func isProcessStatsEnabled(cfg *monitoringCfg.MonitoringHTTPConfig) bool { + return cfg != nil && cfg.Enabled } diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index f5a5f54545db..4fa440df5371 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -439,7 +439,7 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string "namespace": "agent", "period": "10s", "path": "/stats", - "hosts": []string{beats.AgentPrefixedMonitoringEndpoint(o.config.DownloadConfig.OS())}, + "hosts": []string{beats.AgentPrefixedMonitoringEndpoint(o.config.DownloadConfig.OS(), o.config.MonitoringConfig.HTTP)}, "index": fmt.Sprintf("metrics-elastic_agent.%s-default", fixedAgentName), "processors": []map[string]interface{}{ { diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index 8a25eb809351..cbf9edf3266c 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -50,7 +50,7 @@ func TestGenerateSteps(t *testing.T) { for _, tc := range testCases { t.Run(tc.Name, func(t *testing.T) { m := &testMonitor{monitorLogs: tc.Config.MonitorLogs, monitorMetrics: tc.Config.MonitorMetrics} - operator := getMonitorableTestOperator(t, "tests/scripts", m) + operator := getMonitorableTestOperator(t, "tests/scripts", m, tc.Config) steps := operator.generateMonitoringSteps("8.0", sampleOutput) if actualSteps := len(steps); actualSteps != tc.ExpectedSteps { t.Fatalf("invalid number of steps, expected %v, got %v", tc.ExpectedSteps, actualSteps) @@ -100,7 +100,7 @@ func checkStep(t *testing.T, stepName string, expectedOutput interface{}, s conf } } -func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.Monitor) *Operator { +func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.Monitor, mcfg *monitoringConfig.MonitoringConfig) *Operator { cfg := &configuration.SettingsConfig{ RetryConfig: &retry.Config{ Enabled: true, @@ -113,6 +113,7 @@ func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.M InstallPath: installPath, OperatingSystem: "darwin", }, + MonitoringConfig: mcfg, } l := getLogger() diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go index f2066bf04ec6..743d44118d6b 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go @@ -76,7 +76,7 @@ func (b *Monitor) WatchLogs() bool { return b.config.Enabled && b.config.Monitor func (b *Monitor) WatchMetrics() bool { return b.config.Enabled && b.config.MonitorMetrics } func (b *Monitor) generateMonitoringEndpoint(spec program.Spec, pipelineID string) string { - return getMonitoringEndpoint(spec, b.operatingSystem, pipelineID) + return MonitoringEndpoint(spec, b.operatingSystem, pipelineID) } func (b *Monitor) generateLoggingFile(spec program.Spec, pipelineID string) string { diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go index b10f0ef82a59..6aa6c471e791 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" + monitoringConfig "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/config" ) const ( @@ -24,9 +25,12 @@ const ( // args: pipeline name, application name agentMbEndpointFileFormatWin = `npipe:///elastic-agent` + // agentMbEndpointHTTP is used with cloud and exposes metrics on http endpoint + agentMbEndpointHTTP = "http://localhost:%d" ) -func getMonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) string { +// MonitoringEndpoint is an endpoint where process is exposing its metrics. +func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) string { if endpoint, ok := spec.MetricEndpoints[operatingSystem]; ok { return endpoint } @@ -54,7 +58,11 @@ func getLoggingFile(spec program.Spec, operatingSystem, installPath, pipelineID } // AgentMonitoringEndpoint returns endpoint with exposed metrics for agent. -func AgentMonitoringEndpoint(operatingSystem string) string { +func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.MonitoringHTTPConfig) string { + if cfg != nil && cfg.Enabled { + return fmt.Sprintf(agentMbEndpointHTTP, cfg.Port) + } + if operatingSystem == "windows" { return agentMbEndpointFileFormatWin } @@ -69,6 +77,6 @@ func AgentMonitoringEndpoint(operatingSystem string) string { } // AgentPrefixedMonitoringEndpoint returns endpoint with exposed metrics for agent. -func AgentPrefixedMonitoringEndpoint(operatingSystem string) string { - return httpPlusPrefix + AgentMonitoringEndpoint(operatingSystem) +func AgentPrefixedMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.MonitoringHTTPConfig) string { + return httpPlusPrefix + AgentMonitoringEndpoint(operatingSystem, cfg) } diff --git a/x-pack/elastic-agent/pkg/core/monitoring/config/config.go b/x-pack/elastic-agent/pkg/core/monitoring/config/config.go index ceb4b3b6a56e..2ce067d4e192 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/config/config.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/config/config.go @@ -4,11 +4,23 @@ package config +const defaultPort = 6791 + // MonitoringConfig describes a configuration of a monitoring type MonitoringConfig struct { - Enabled bool `yaml:"enabled" config:"enabled"` - MonitorLogs bool `yaml:"logs" config:"logs"` - MonitorMetrics bool `yaml:"metrics" config:"metrics"` + Enabled bool `yaml:"enabled" config:"enabled"` + MonitorLogs bool `yaml:"logs" config:"logs"` + MonitorMetrics bool `yaml:"metrics" config:"metrics"` + HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"` +} + +// MonitoringHTTPConfig is a config defining HTTP endpoint published by agent +// for other processes to watch its metrics. +// Processes are only exposed when HTTP is enabled. +type MonitoringHTTPConfig struct { + Enabled bool `yaml:"enabled" config:"enabled"` + Host string `yaml:"host" config:"host"` + Port int `yaml:"port" config:"port" validate:"min=0,max=65535,nonzero"` } // DefaultConfig creates a config with pre-set default values. @@ -17,5 +29,9 @@ func DefaultConfig() *MonitoringConfig { Enabled: true, MonitorLogs: true, MonitorMetrics: true, + HTTP: &MonitoringHTTPConfig{ + Enabled: false, + Port: defaultPort, + }, } } diff --git a/x-pack/elastic-agent/pkg/core/monitoring/server/handler.go b/x-pack/elastic-agent/pkg/core/monitoring/server/handler.go new file mode 100644 index 000000000000..dfb50d7e0246 --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/monitoring/server/handler.go @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "fmt" + "net/http" +) + +type apiError interface { + Status() int +} + +func createHandler(fn func(w http.ResponseWriter, r *http.Request) error) *apiHandler { + return &apiHandler{ + innerFn: fn, + } +} + +type apiHandler struct { + innerFn func(w http.ResponseWriter, r *http.Request) error +} + +// ServeHTTP sets status code based on err returned +func (h *apiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + err := h.innerFn(w, r) + if err != nil { + writeResponse(w, unexpectedErrorWithReason(err.Error())) + + switch e := err.(type) { + case apiError: + w.WriteHeader(e.Status()) + default: + w.WriteHeader(http.StatusInternalServerError) + + } + } +} + +func unexpectedErrorWithReason(reason string, args ...interface{}) errResponse { + return errResponse{ + Type: errTypeUnexpected, + Reason: fmt.Sprintf(reason, args...), + } +} diff --git a/x-pack/elastic-agent/pkg/core/monitoring/server/process.go b/x-pack/elastic-agent/pkg/core/monitoring/server/process.go new file mode 100644 index 000000000000..753fe3602b59 --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/monitoring/server/process.go @@ -0,0 +1,195 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "strings" + "syscall" + "time" + + "github.com/gorilla/mux" + + "github.com/elastic/beats/v7/metricbeat/mb/parse" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats" +) + +const ( + processIDKey = "processID" + monitoringSuffix = "-monitoring" + separator = "-" + timeout = 10 * time.Second + errTypeUnexpected = "UNEXPECTED" + + httpPlusPrefix = "http+" +) + +var ( + // ErrProgramNotSupported returned when requesting metrics for not supported program. + ErrProgramNotSupported = errors.New("specified program is not supported") + invalidChars = map[rune]struct{}{ + '"': {}, + '<': {}, + '>': {}, + '|': {}, + 0: {}, + ':': {}, + '*': {}, + '?': {}, + '\\': {}, + '/': {}, + ';': {}, + } +) + +func processHandler() func(http.ResponseWriter, *http.Request) error { + return func(w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + vars := mux.Vars(r) + id, found := vars[processIDKey] + + if !found { + return errorfWithStatus(http.StatusNotFound, "productID not found") + } + + metricsBytes, statusCode, metricsErr := processMetrics(r.Context(), id) + if metricsErr != nil { + return metricsErr + } + + if statusCode > 0 { + w.WriteHeader(statusCode) + } + + fmt.Fprint(w, string(metricsBytes)) + return nil + } +} + +func processMetrics(ctx context.Context, id string) ([]byte, int, error) { + detail, err := parseID(id) + if err != nil { + return nil, 0, err + } + + endpoint := beats.MonitoringEndpoint(detail.spec, artifact.DefaultConfig().OS(), detail.output) + if !strings.HasPrefix(endpoint, httpPlusPrefix) && !strings.HasPrefix(endpoint, "http") { + // add prefix for npipe and unix + endpoint = httpPlusPrefix + endpoint + } + + if detail.isMonitoring { + endpoint += "_monitor" + } + + hostData, err := parse.ParseURL(endpoint, "http", "", "", "stats", "") + if err != nil { + return nil, 0, errorWithStatus(http.StatusInternalServerError, err) + } + + dialer, err := hostData.Transport.Make(timeout) + if err != nil { + return nil, 0, errorWithStatus(http.StatusInternalServerError, err) + } + + client := http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + Dial: dialer.Dial, + }, + } + + req, err := http.NewRequest("GET", hostData.URI, nil) + if err != nil { + return nil, 0, errorWithStatus( + http.StatusInternalServerError, + fmt.Errorf("fetching metrics failed: %v", err.Error()), + ) + } + + resp, err := client.Do(req.WithContext(ctx)) + if err != nil { + statusCode := http.StatusInternalServerError + if errors.Is(err, syscall.ENOENT) { + statusCode = http.StatusNotFound + } + return nil, 0, errorWithStatus(statusCode, err) + } + defer resp.Body.Close() + + rb, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, 0, errorWithStatus(http.StatusInternalServerError, err) + } + + return rb, resp.StatusCode, nil +} + +func writeResponse(w http.ResponseWriter, c interface{}) { + bytes, err := json.Marshal(c) + if err != nil { + // json marshal failed + fmt.Fprintf(w, "Not valid json: %v", err) + return + } + + fmt.Fprint(w, string(bytes)) + +} + +type programDetail struct { + output string + binaryName string + isMonitoring bool + spec program.Spec +} + +func parseID(id string) (programDetail, error) { + var detail programDetail + if !isIDValid(id) { + return detail, errorfWithStatus(http.StatusBadRequest, "provided ID is not valid") + } + + for p, spec := range program.SupportedMap { + if !strings.HasPrefix(id, p+separator) { + continue + } + + detail.binaryName = p + detail.spec = spec + break + } + + if detail.binaryName == "" { + return detail, errorWithStatus(http.StatusNotFound, ErrProgramNotSupported) + } + + if strings.HasSuffix(id, monitoringSuffix) { + detail.isMonitoring = true + id = strings.TrimSuffix(id, monitoringSuffix) + } + + detail.output = strings.TrimPrefix(id, detail.binaryName+separator) + + return detail, nil +} + +func isIDValid(id string) bool { + for _, c := range id { + if _, found := invalidChars[c]; found { + return false + } + } + + return true +} diff --git a/x-pack/elastic-agent/pkg/core/monitoring/server/process_test.go b/x-pack/elastic-agent/pkg/core/monitoring/server/process_test.go new file mode 100644 index 000000000000..e518322749b8 --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/monitoring/server/process_test.go @@ -0,0 +1,56 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. +package server + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseID(t *testing.T) { + cases := []struct { + Name string + ID string + ExpectedError bool + ExpectedStatusCode int + ExpectedProgram programDetail + }{ + {"path injected id", ".././../etc/passwd", true, http.StatusBadRequest, programDetail{}}, + {"pipe injected id", "first | second", true, http.StatusBadRequest, programDetail{}}, + {"filebeat with suffix", "filebeat;cat demo-default-monitoring", true, http.StatusBadRequest, programDetail{}}, + + {"filebeat correct", "filebeat-default", false, http.StatusBadRequest, programDetail{output: "default", binaryName: "filebeat"}}, + {"filebeat monitor correct", "filebeat-default-monitoring", false, http.StatusBadRequest, programDetail{output: "default", binaryName: "filebeat", isMonitoring: true}}, + + {"mb correct", "metricbeat-default", false, http.StatusBadRequest, programDetail{output: "default", binaryName: "metricbeat"}}, + {"mb monitor correct", "metricbeat-default-monitoring", false, http.StatusBadRequest, programDetail{output: "default", binaryName: "metricbeat", isMonitoring: true}}, + + {"endpoint correct", "endpoint-security-default", false, http.StatusBadRequest, programDetail{output: "default", binaryName: "endpoint-security"}}, + {"endpoint monitor correct", "endpoint-security-default-monitoring", false, http.StatusBadRequest, programDetail{output: "default", binaryName: "endpoint-security", isMonitoring: true}}, + + {"unknown", "unknown-default", true, http.StatusNotFound, programDetail{}}, + {"unknown monitor", "unknown-default-monitoring", true, http.StatusNotFound, programDetail{}}, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + pd, err := parseID(tc.ID) + if !tc.ExpectedError { + require.NoError(t, err) + } + + if tc.ExpectedStatusCode > 0 && tc.ExpectedError { + statErr, ok := err.(apiError) + require.True(t, ok) + require.Equal(t, tc.ExpectedStatusCode, statErr.Status()) + } + + require.Equal(t, tc.ExpectedProgram.binaryName, pd.binaryName) + require.Equal(t, tc.ExpectedProgram.output, pd.output) + require.Equal(t, tc.ExpectedProgram.isMonitoring, pd.isMonitoring) + }) + } +} diff --git a/x-pack/elastic-agent/pkg/core/monitoring/server/processes.go b/x-pack/elastic-agent/pkg/core/monitoring/server/processes.go new file mode 100644 index 000000000000..a298761065c7 --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/monitoring/server/processes.go @@ -0,0 +1,133 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "net/http" + "strconv" + "strings" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted" +) + +const ( + configuredType = "configured" + internalType = "internal" +) + +type sourceInfo struct { + // Kind is a kind of process e.g configured or internal + // configured - used for user configured processes + // internal - used for monitoring processes + Kind string `json:"kind"` + + // Outputs process is handling. + Outputs []string `json:"outputs"` +} + +type processInfo struct { + // ID is a unique id of the process. + ID string `json:"id"` + + // PID is a current process ID. + PID string `json:"pid"` + + // Binary name e.g filebeat, this does not contain absolute path. + Binary string `json:"binary"` + + // Source information + Source sourceInfo `json:"source"` +} + +type processesResponse struct { + Processes []processInfo `json:"processes"` +} + +type errResponse struct { + // Type is a type of error + Type string `json:"type"` + + // Reason is a detailed error message + Reason string `json:"reason"` +} + +type stater interface { + State() map[string]state.State +} + +func processesHandler(routesFetchFn func() *sorted.Set) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + resp := processesResponse{ + Processes: processesFromRoutes(routesFetchFn), + } + + writeResponse(w, resp) + } +} + +func processesFromRoutes(routesFetchFn func() *sorted.Set) []processInfo { + var processes []processInfo + routes := routesFetchFn() + + for _, k := range routes.Keys() { + op, found := routes.Get(k) + if !found { + continue + } + + s, ok := op.(stater) + if !ok { + continue + } + + states := s.State() + + for app, state := range states { + binaryName, isMonitoring := appNameFromDescriptor(app) + appType := configuredType + if isMonitoring { + appType = internalType + } + + var pid int + if state.ProcessInfo != nil { + pid = state.ProcessInfo.PID + } + + processInfo := processInfo{ + ID: processID(k, binaryName, isMonitoring), + PID: strconv.Itoa(pid), + Binary: binaryName, + Source: sourceInfo{ + Kind: appType, + Outputs: []string{k}, + }, + } + + processes = append(processes, processInfo) + } + } + + return processes +} + +func processID(output, binaryName string, isMonitoring bool) string { + id := binaryName + separator + output + if isMonitoring { + return id + monitoringSuffix + } + + return id +} + +func appNameFromDescriptor(d string) (string, bool) { + // monitoring desctiptor contains suffix with tag + // non monitoring just `binaryname--version` + parts := strings.Split(d, "--") + return parts[0], len(parts) > 2 +} diff --git a/x-pack/elastic-agent/pkg/core/monitoring/server/processes_test.go b/x-pack/elastic-agent/pkg/core/monitoring/server/processes_test.go new file mode 100644 index 000000000000..d1039c716433 --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/monitoring/server/processes_test.go @@ -0,0 +1,171 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. +package server + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "os" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted" +) + +func TestProcesses(t *testing.T) { + testRoutes := func(routes map[string]stater) func() *sorted.Set { + set := sorted.NewSet() + for k, s := range routes { + set.Add(k, s) + } + + return func() *sorted.Set { return set } + } + + t.Run("nothing running", func(t *testing.T) { + r := testRoutes(nil) + w := &testWriter{} + fn := processesHandler(r) + fn(w, nil) + + pr := processesResponse{ + Processes: nil, + } + + assert.Equal(t, 1, len(w.responses)) + if !assert.True(t, jsonComparer(w.responses[0], pr)) { + diff := cmp.Diff(pr, w.responses[0]) + t.Logf("Mismatch (-want, +got)\n%s", diff) + } + }) + + t.Run("process running", func(t *testing.T) { + r := testRoutes(map[string]stater{ + "default": &testStater{ + states: map[string]state.State{ + "filebeat--8.0.0": { + ProcessInfo: &process.Info{ + PID: 123, + Process: &os.Process{ + Pid: 123, + }, + }, + Status: state.Configuring, + }, + }, + }, + }) + w := &testWriter{} + fn := processesHandler(r) + fn(w, nil) + + pr := processesResponse{ + Processes: []processInfo{ + { + ID: "filebeat-default", + PID: "123", + Binary: "filebeat", + Source: sourceInfo{Kind: "configured", Outputs: []string{"default"}}, + }, + }, + } + + assert.Equal(t, 1, len(w.responses)) + if !assert.True(t, jsonComparer(w.responses[0], pr)) { + diff := cmp.Diff(w.responses[0], pr) + t.Logf("Mismatch (-want, +got)\n%s", diff) + } + }) + + t.Run("monitoring running", func(t *testing.T) { + r := testRoutes(map[string]stater{ + "default": &testStater{ + states: map[string]state.State{ + "filebeat--8.0.0--tag": { + ProcessInfo: &process.Info{ + PID: 123, + Process: &os.Process{ + Pid: 123, + }, + }, + Status: state.Configuring, + }, + }, + }, + }) + w := &testWriter{} + fn := processesHandler(r) + fn(w, nil) + + pr := processesResponse{ + Processes: []processInfo{ + { + ID: "filebeat-default-monitoring", + PID: "123", + Binary: "filebeat", + Source: sourceInfo{Kind: "internal", Outputs: []string{"default"}}, + }, + }, + } + + assert.Equal(t, 1, len(w.responses)) + if !assert.True(t, jsonComparer(w.responses[0], pr)) { + diff := cmp.Diff(w.responses[0], pr) + t.Logf("Mismatch (-want, +got)\n%s", diff) + } + }) +} + +type testStater struct { + states map[string]state.State +} + +func (s *testStater) State() map[string]state.State { + return s.states +} + +type testWriter struct { + responses []string + statusCode int +} + +func (w *testWriter) Header() http.Header { + return http.Header{} +} + +func (w *testWriter) Write(r []byte) (int, error) { + if w.responses == nil { + w.responses = make([]string, 0) + } + w.responses = append(w.responses, string(r)) + + return len(r), nil +} + +func (w *testWriter) WriteHeader(statusCode int) { + w.statusCode = statusCode +} + +func jsonComparer(expected string, candidate interface{}) bool { + candidateJSON, err := json.Marshal(&candidate) + if err != nil { + fmt.Println(err) + return false + } + + cbytes := make([]byte, 0, len(candidateJSON)) + bbuf := bytes.NewBuffer(cbytes) + if err := json.Compact(bbuf, candidateJSON); err != nil { + fmt.Println(err) + return false + } + + return bytes.Equal([]byte(expected), bbuf.Bytes()) +} diff --git a/x-pack/elastic-agent/pkg/core/monitoring/server/server.go b/x-pack/elastic-agent/pkg/core/monitoring/server/server.go new file mode 100644 index 000000000000..25703d1524df --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/monitoring/server/server.go @@ -0,0 +1,109 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "fmt" + "net/http" + "os" + "path/filepath" + "runtime" + "strings" + + "github.com/gorilla/mux" + + "github.com/elastic/beats/v7/libbeat/api" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted" +) + +// New creates a new server exposing metrics and process information. +func New( + log *logger.Logger, + endpointConfig api.Config, + ns func(string) *monitoring.Namespace, + routesFetchFn func() *sorted.Set, + enableProcessStats bool, +) (*api.Server, error) { + if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil { + // log but ignore + log.Errorf("failed to create monitoring drop: %v", err) + } + + cfg, err := common.NewConfigFrom(endpointConfig) + if err != nil { + return nil, err + } + + return exposeMetricsEndpoint(log, cfg, ns, routesFetchFn, enableProcessStats) +} + +func exposeMetricsEndpoint(log *logger.Logger, config *common.Config, ns func(string) *monitoring.Namespace, routesFetchFn func() *sorted.Set, enableProcessStats bool) (*api.Server, error) { + r := mux.NewRouter() + r.Handle("/stats", createHandler(statsHandler(ns("stats")))) + + if enableProcessStats { + r.HandleFunc("/processes", processesHandler(routesFetchFn)) + r.Handle("/processes/{processID}", createHandler(processHandler())) + } + + mux := http.NewServeMux() + mux.Handle("/", r) + + return api.New(log, mux, config) +} + +func createAgentMonitoringDrop(drop string) error { + if drop == "" || runtime.GOOS == "windows" { + return nil + } + + path := strings.TrimPrefix(drop, "unix://") + if strings.HasSuffix(path, ".sock") { + path = filepath.Dir(path) + } + + _, err := os.Stat(path) + if err != nil { + if !os.IsNotExist(err) { + return err + } + + // create + if err := os.MkdirAll(path, 0775); err != nil { + return err + } + } + + return os.Chown(path, os.Geteuid(), os.Getegid()) +} + +func errorWithStatus(status int, err error) *statusError { + return &statusError{ + err: err, + status: status, + } +} + +func errorfWithStatus(status int, msg string, args ...string) *statusError { + err := fmt.Errorf(msg, args) + return errorWithStatus(status, err) +} + +// StatusError holds correlation between error and a status +type statusError struct { + err error + status int +} + +func (s *statusError) Status() int { + return s.status +} + +func (s *statusError) Error() string { + return s.err.Error() +} diff --git a/x-pack/elastic-agent/pkg/core/monitoring/server/stats.go b/x-pack/elastic-agent/pkg/core/monitoring/server/stats.go new file mode 100644 index 000000000000..5bb84246e9fd --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/monitoring/server/stats.go @@ -0,0 +1,36 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/elastic/beats/v7/libbeat/monitoring" +) + +func statsHandler(ns *monitoring.Namespace) func(http.ResponseWriter, *http.Request) error { + return func(w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + data := monitoring.CollectStructSnapshot( + ns.GetRegistry(), + monitoring.Full, + false, + ) + + bytes, err := json.Marshal(data) + var content string + if err != nil { + content = fmt.Sprintf("Not valid json: %v", err) + } else { + content = string(bytes) + } + fmt.Fprint(w, content) + + return nil + } +}