diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bf00a646531..8a90c76fc82 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -51,6 +51,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `nodes` to filebeat-kubernetes.yaml ClusterRole. {issue}24051[24051] {pull}24052[24052] *Heartbeat* +- Adds negative body match. {pull}20728[20728] +- Refactor synthetics configuration to new syntax. {pull}23467[23467] *Journalbeat* diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 171bdfe5fdb..85bfb79fe20 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -18,7 +18,6 @@ package beater import ( - "context" "fmt" "time" @@ -83,10 +82,11 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { func (bt *Heartbeat) Run(b *beat.Beat) error { logp.Info("heartbeat is running! Hit CTRL-C to stop it.") - err := bt.RunStaticMonitors(b) + stopStaticMonitors, err := bt.RunStaticMonitors(b) if err != nil { return err } + defer stopStaticMonitors() if b.Manager.Enabled() { bt.RunCentralMgmtMonitors(b) @@ -102,13 +102,6 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { } } - if len(bt.config.SyntheticSuites) > 0 { - err := bt.RunSyntheticSuiteMonitors(b) - if err != nil { - return err - } - } - if bt.config.Autodiscover != nil { bt.autodiscover, err = bt.makeAutodiscover(b) if err != nil { @@ -131,9 +124,10 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { } // RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present. -func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error { +func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) { factory := monitors.NewFactory(b.Info, bt.scheduler, true) + var runners []cfgfile.Runner for _, cfg := range bt.config.Monitors { created, err := factory.Create(b.Publisher, cfg) if err != nil { @@ -141,12 +135,19 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error { continue // don't stop loading monitors just because they're disabled } - return errors.Wrap(err, "could not create monitor") + return nil, errors.Wrap(err, "could not create monitor") } created.Start() + runners = append(runners, created) } - return nil + + stop = func() { + for _, runner := range runners { + runner.Stop() + } + } + return stop, nil } // RunCentralMgmtMonitors loads any central management configured configs. @@ -170,50 +171,6 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) { return nil } -// Provide hook to define journey list discovery from x-pack -type JourneyLister func(ctx context.Context, suiteFile string, params common.MapStr) ([]string, error) - -var mainJourneyLister JourneyLister - -func RegisterJourneyLister(jl JourneyLister) { - mainJourneyLister = jl -} - -func (bt *Heartbeat) RunSyntheticSuiteMonitors(b *beat.Beat) error { - // If we are running without XPack this will be nil - if mainJourneyLister == nil { - return nil - } - for _, suite := range bt.config.SyntheticSuites { - logp.Info("Listing suite %s", suite.Path) - journeyNames, err := mainJourneyLister(context.TODO(), suite.Path, suite.Params) - if err != nil { - return err - } - factory := monitors.NewFactory(b.Info, bt.scheduler, false) - for _, name := range journeyNames { - cfg, err := common.NewConfigFrom(map[string]interface{}{ - "type": "browser", - "path": suite.Path, - "schedule": suite.Schedule, - "params": suite.Params, - "journey_name": name, - "name": name, - "id": name, - }) - if err != nil { - return err - } - created, err := factory.Create(b.Publisher, cfg) - if err != nil { - return errors.Wrap(err, "could not create monitor") - } - created.Start() - } - } - return nil -} - // makeAutodiscover creates an autodiscover object ready to be started. func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) { autodiscover, err := autodiscover.NewAutodiscover( diff --git a/heartbeat/config/config.go b/heartbeat/config/config.go index bfee8096f39..052d472ea39 100644 --- a/heartbeat/config/config.go +++ b/heartbeat/config/config.go @@ -32,7 +32,7 @@ type Config struct { ConfigMonitors *common.Config `config:"config.monitors"` Scheduler Scheduler `config:"scheduler"` Autodiscover *autodiscover.Config `config:"autodiscover"` - SyntheticSuites []*SyntheticSuite `config:"synthetic_suites"` + SyntheticSuites []*common.Config `config:"synthetic_suites"` } // Scheduler defines the syntax of a heartbeat.yml scheduler block. @@ -41,12 +41,5 @@ type Scheduler struct { Location string `config:"location"` } -type SyntheticSuite struct { - Path string `config:"path"` - Name string `config:"id_prefix"` - Schedule string `config:"schedule"` - Params map[string]interface{} `config:"params"` -} - // DefaultConfig is the canonical instantiation of Config. var DefaultConfig = Config{} diff --git a/heartbeat/monitors/active/http/http.go b/heartbeat/monitors/active/http/http.go index 74463663567..214ed8519d8 100644 --- a/heartbeat/monitors/active/http/http.go +++ b/heartbeat/monitors/active/http/http.go @@ -22,7 +22,8 @@ import ( "net/http" "net/url" - "github.com/elastic/beats/v7/heartbeat/monitors" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/libbeat/common" @@ -32,8 +33,7 @@ import ( ) func init() { - monitors.RegisterActive("http", create) - monitors.RegisterActive("synthetics/http", create) + plugin.Register("http", create, "synthetics/http") } var debugf = logp.MakeDebug("http") @@ -42,15 +42,15 @@ var debugf = logp.MakeDebug("http") func create( name string, cfg *common.Config, -) (js []jobs.Job, endpoints int, err error) { +) (p plugin.Plugin, err error) { config := defaultConfig if err := cfg.Unpack(&config); err != nil { - return nil, 0, err + return plugin.Plugin{}, err } tls, err := tlscommon.LoadTLSConfig(config.TLS) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } var body []byte @@ -61,13 +61,13 @@ func create( compression := config.Check.Request.Compression enc, err = getContentEncoder(compression.Type, compression.Level) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } buf := bytes.NewBuffer(nil) err = enc.Encode(buf, bytes.NewBufferString(config.Check.Request.SendBody)) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } body = buf.Bytes() @@ -75,7 +75,7 @@ func create( validator, err := makeValidateResponse(&config.Check.Response) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } // Determine whether we're using a proxy or not and then use that to figure out how to @@ -87,7 +87,7 @@ func create( if config.ProxyURL != "" || config.MaxRedirects > 0 { transport, err := newRoundTripper(&config, tls) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } makeJob = func(urlStr string) (jobs.Job, error) { @@ -99,16 +99,16 @@ func create( } } - js = make([]jobs.Job, len(config.Hosts)) + js := make([]jobs.Job, len(config.Hosts)) for i, urlStr := range config.Hosts { u, _ := url.Parse(urlStr) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } job, err := makeJob(urlStr) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } // Assign any execution errors to the error field and @@ -116,7 +116,7 @@ func create( js[i] = wrappers.WithURLField(u, job) } - return js, len(config.Hosts), nil + return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(config.Hosts)}, nil } func newRoundTripper(config *Config, tls *tlscommon.TLSConfig) (*http.Transport, error) { diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index cd2c03dee3f..6fcd3c20a43 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -78,17 +78,17 @@ func sendTLSRequest(t *testing.T, testURL string, useUrls bool, extraConfig map[ config, err := common.NewConfigFrom(configSrc) require.NoError(t, err) - jobs, endpoints, err := create("tls", config) + p, err := create("tls", config) require.NoError(t, err) sched := schedule.MustParse("@every 1s") - job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "tls", Type: "http", Schedule: sched, Timeout: 1})[0] + job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "tls", Type: "http", Schedule: sched, Timeout: 1})[0] event := &beat.Event{} _, err = job(event) require.NoError(t, err) - require.Equal(t, 1, endpoints) + require.Equal(t, 1, p.Endpoints) return event } @@ -318,11 +318,11 @@ func TestLargeResponse(t *testing.T) { config, err := common.NewConfigFrom(configSrc) require.NoError(t, err) - jobs, _, err := create("largeresp", config) + p, err := create("largeresp", config) require.NoError(t, err) sched, _ := schedule.Parse("@every 1s") - job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0] + job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0] event := &beat.Event{} _, err = job(event) @@ -532,11 +532,11 @@ func TestRedirect(t *testing.T) { config, err := common.NewConfigFrom(configSrc) require.NoError(t, err) - jobs, _, err := create("redirect", config) + p, err := create("redirect", config) require.NoError(t, err) sched, _ := schedule.Parse("@every 1s") - job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0] + job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0] // Run this test multiple times since in the past we had an issue where the redirects // list was added onto by each request. See https://github.com/elastic/beats/pull/15944 @@ -579,11 +579,11 @@ func TestNoHeaders(t *testing.T) { config, err := common.NewConfigFrom(configSrc) require.NoError(t, err) - jobs, _, err := create("http", config) + p, err := create("http", config) require.NoError(t, err) sched, _ := schedule.Parse("@every 1s") - job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0] + job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0] event := &beat.Event{} _, err = job(event) diff --git a/heartbeat/monitors/active/icmp/icmp.go b/heartbeat/monitors/active/icmp/icmp.go index f9119ab19ec..1315a1dddf0 100644 --- a/heartbeat/monitors/active/icmp/icmp.go +++ b/heartbeat/monitors/active/icmp/icmp.go @@ -22,6 +22,8 @@ import ( "net" "net/url" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/heartbeat/eventext" "github.com/elastic/beats/v7/heartbeat/look" "github.com/elastic/beats/v7/heartbeat/monitors" @@ -35,30 +37,29 @@ import ( var debugf = logp.MakeDebug("icmp") func init() { - monitors.RegisterActive("icmp", create) - monitors.RegisterActive("synthetics/icmp", create) + plugin.Register("icmp", create, "synthetics/icmp") } func create( name string, commonConfig *common.Config, -) (jobs []jobs.Job, endpoints int, err error) { +) (p plugin.Plugin, err error) { loop, err := getStdLoop() if err != nil { logp.Warn("Failed to initialize ICMP loop %v", err) - return nil, 0, err + return plugin.Plugin{}, err } config := DefaultConfig if err := commonConfig.Unpack(&config); err != nil { - return nil, 0, err + return plugin.Plugin{}, err } jf, err := newJobFactory(config, monitors.NewStdResolver(), loop) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } - return jf.makeJobs() + return jf.makePlugin() } @@ -89,29 +90,30 @@ func (jf *jobFactory) checkConfig() error { return nil } -func (jf *jobFactory) makeJobs() (j []jobs.Job, endpoints int, err error) { +func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) { if err := jf.loop.checkNetworkMode(jf.ipVersion); err != nil { - return nil, 0, err + return plugin.Plugin{}, err } pingFactory := jf.pingIPFactory(&jf.config) + var j []jobs.Job for _, host := range jf.config.Hosts { job, err := monitors.MakeByHostJob(host, jf.config.Mode, monitors.NewStdResolver(), pingFactory) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } u, err := url.Parse(fmt.Sprintf("icmp://%s", host)) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } j = append(j, wrappers.WithURLField(u, job)) } - return j, len(jf.config.Hosts), nil + return plugin.Plugin{Jobs: j, Close: nil, Endpoints: len(jf.config.Hosts)}, nil } func (jf *jobFactory) pingIPFactory(config *Config) func(*net.IPAddr) jobs.Job { diff --git a/heartbeat/monitors/active/icmp/icmp_test.go b/heartbeat/monitors/active/icmp/icmp_test.go index 955520b81ba..2fd654648db 100644 --- a/heartbeat/monitors/active/icmp/icmp_test.go +++ b/heartbeat/monitors/active/icmp/icmp_test.go @@ -65,12 +65,12 @@ func execTestICMPCheck(t *testing.T, cfg Config) (mockLoop, *beat.Event) { tl := mockLoop{pingRtt: time.Microsecond * 1000, pingRequests: 1} jf, err := newJobFactory(cfg, monitors.NewStdResolver(), tl) require.NoError(t, err) - j, endpoints, err := jf.makeJobs() - require.Len(t, j, 1) - require.Equal(t, 1, endpoints) + p, err := jf.makePlugin() + require.Len(t, p.Jobs, 1) + require.Equal(t, 1, p.Endpoints) e := &beat.Event{} sched, _ := schedule.Parse("@every 1s") - wrapped := wrappers.WrapCommon(j, stdfields.StdMonitorFields{ID: "test", Type: "icmp", Schedule: sched, Timeout: 1}) + wrapped := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "icmp", Schedule: sched, Timeout: 1}) wrapped[0](e) return tl, e } diff --git a/heartbeat/monitors/active/tcp/helpers_test.go b/heartbeat/monitors/active/tcp/helpers_test.go index ea3a22b2888..b5c7aa077f3 100644 --- a/heartbeat/monitors/active/tcp/helpers_test.go +++ b/heartbeat/monitors/active/tcp/helpers_test.go @@ -38,17 +38,17 @@ func testTCPConfigCheck(t *testing.T, configMap common.MapStr, host string, port config, err := common.NewConfigFrom(configMap) require.NoError(t, err) - jobs, endpoints, err := create("tcp", config) + p, err := create("tcp", config) require.NoError(t, err) sched := schedule.MustParse("@every 1s") - job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "tcp", Schedule: sched, Timeout: 1})[0] + job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "tcp", Schedule: sched, Timeout: 1})[0] event := &beat.Event{} _, err = job(event) require.NoError(t, err) - require.Equal(t, 1, endpoints) + require.Equal(t, 1, p.Endpoints) return event } diff --git a/heartbeat/monitors/active/tcp/tcp.go b/heartbeat/monitors/active/tcp/tcp.go index 6be682ee560..aeaebe79a55 100644 --- a/heartbeat/monitors/active/tcp/tcp.go +++ b/heartbeat/monitors/active/tcp/tcp.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain" "github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/heartbeat/reason" "github.com/elastic/beats/v7/libbeat/beat" @@ -39,8 +40,7 @@ import ( ) func init() { - monitors.RegisterActive("tcp", create) - monitors.RegisterActive("synthetics/tcp", create) + plugin.Register("tcp", create, "synthetics/tcp") } var debugf = logp.MakeDebug("tcp") @@ -48,7 +48,7 @@ var debugf = logp.MakeDebug("tcp") func create( name string, cfg *common.Config, -) (jobs []jobs.Job, endpoints int, err error) { +) (p plugin.Plugin, err error) { return createWithResolver(cfg, monitors.NewStdResolver()) } @@ -57,18 +57,18 @@ func create( func createWithResolver( cfg *common.Config, resolver monitors.Resolver, -) (jobs []jobs.Job, endpoints int, err error) { +) (p plugin.Plugin, err error) { jc, err := newJobFactory(cfg, resolver) if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } - jobs, err = jc.makeJobs() + js, err := jc.makeJobs() if err != nil { - return nil, 0, err + return plugin.Plugin{}, err } - return jobs, len(jc.endpoints), nil + return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(jc.endpoints)}, nil } // jobFactory is where most of the logic here lives. It provides a common context around diff --git a/heartbeat/monitors/active/tcp/tls_test.go b/heartbeat/monitors/active/tcp/tls_test.go index 88c539ee7e7..62477ed2f56 100644 --- a/heartbeat/monitors/active/tcp/tls_test.go +++ b/heartbeat/monitors/active/tcp/tls_test.go @@ -184,17 +184,17 @@ func testTLSTCPCheck(t *testing.T, host string, port uint16, certFileName string }) require.NoError(t, err) - jobs, endpoints, err := createWithResolver(config, resolver) + p, err := createWithResolver(config, resolver) require.NoError(t, err) sched := schedule.MustParse("@every 1s") - job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "tcp", Schedule: sched, Timeout: 1})[0] + job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "tcp", Schedule: sched, Timeout: 1})[0] event := &beat.Event{} _, err = job(event) require.NoError(t, err) - require.Equal(t, 1, endpoints) + require.Equal(t, 1, p.Endpoints) return event } diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 5e54eca78fe..7e4da00f82d 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -20,6 +20,7 @@ package monitors import ( "fmt" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/scheduler" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" @@ -77,13 +78,13 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne } p = pipetool.WithClientConfigEdit(p, configEditor) - monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches) + monitor, err := newMonitor(c, plugin.GlobalPluginsReg, p, f.sched, f.allowWatches) return monitor, err } // CheckConfig checks to see if the given monitor config is valid. func (f *RunnerFactory) CheckConfig(config *common.Config) error { - return checkMonitorConfig(config, globalPluginsReg, f.allowWatches) + return checkMonitorConfig(config, plugin.GlobalPluginsReg, f.allowWatches) } func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.ConfigEditor, error) { diff --git a/heartbeat/monitors/mocks_test.go b/heartbeat/monitors/mocks_test.go index 629eeb43a43..fecf8a53f59 100644 --- a/heartbeat/monitors/mocks_test.go +++ b/heartbeat/monitors/mocks_test.go @@ -29,8 +29,10 @@ import ( "github.com/elastic/beats/v7/heartbeat/hbtest" "github.com/elastic/beats/v7/heartbeat/hbtestllext" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/isdef" @@ -125,7 +127,7 @@ func mockEventCustomFields() map[string]interface{} { return common.MapStr{"foo": "bar"} } -func createMockJob(name string, cfg *common.Config) ([]jobs.Job, error) { +func createMockJob() ([]jobs.Job, error) { j := jobs.MakeSimpleJob(func(event *beat.Event) error { eventext.MergeEventFields(event, mockEventCustomFields()) return nil @@ -134,28 +136,42 @@ func createMockJob(name string, cfg *common.Config) ([]jobs.Job, error) { return []jobs.Job{j}, nil } -func mockPluginBuilder() pluginBuilder { +func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) { reg := monitoring.NewRegistry() - return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]jobs.Job, int, error) { - // Declare a real config block with a required attr so we can see what happens when it doesn't work - unpacked := struct { - URLs []string `config:"urls" validate:"required"` - }{} - err := config.Unpack(&unpacked) - if err != nil { - return nil, 0, err - } - c := common.Config{} - j, err := createMockJob("test", &c) - return j, 1, err - }, newPluginCountersRecorder("test", reg)} -} - -func mockPluginsReg() *pluginsReg { - reg := newPluginsReg() - reg.add(mockPluginBuilder()) - return reg + built := atomic.NewInt(0) + closed := atomic.NewInt(0) + + return plugin.PluginFactory{ + Name: "test", + Aliases: []string{"testAlias"}, + Builder: func(s string, config *common.Config) (plugin.Plugin, error) { + built.Inc() + // Declare a real config block with a required attr so we can see what happens when it doesn't work + unpacked := struct { + URLs []string `config:"urls" validate:"required"` + }{} + err := config.Unpack(&unpacked) + if err != nil { + return plugin.Plugin{}, err + } + j, err := createMockJob() + closer := func() error { + closed.Inc() + return nil + } + return plugin.Plugin{Jobs: j, Close: closer, Endpoints: 1}, err + }, + Stats: plugin.NewPluginCountersRecorder("test", reg)}, + built, + closed +} + +func mockPluginsReg() (p *plugin.PluginsReg, built *atomic.Int, closed *atomic.Int) { + reg := plugin.NewPluginsReg() + builder, built, closed := mockPluginBuilder() + reg.Add(builder) + return reg, built, closed } func mockPluginConf(t *testing.T, id string, schedule string, url string) *common.Config { diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 66e7317482f..4003cacdf42 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -23,12 +23,12 @@ import ( "fmt" "sync" - "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" - "github.com/mitchellh/hashstructure" "github.com/pkg/errors" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/heartbeat/scheduler" "github.com/elastic/beats/v7/heartbeat/watcher" @@ -43,7 +43,7 @@ type Monitor struct { stdFields stdfields.StdMonitorFields pluginName string config *common.Config - registrar *pluginsReg + registrar *plugin.PluginsReg uniqueName string scheduler *scheduler.Scheduler configuredJobs []*configuredJob @@ -53,6 +53,7 @@ type Monitor struct { // internalsMtx is used to synchronize access to critical // internal datastructures internalsMtx sync.Mutex + close func() error // Watch related fields watchPollTasks []*configuredJob @@ -62,7 +63,7 @@ type Monitor struct { // stats is the countersRecorder used to record lifecycle events // for global metrics + telemetry - stats registryRecorder + stats plugin.RegistryRecorder } // String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe @@ -71,7 +72,7 @@ func (m *Monitor) String() string { return fmt.Sprintf("Monitor", m.stdFields.Name, m.enabled) } -func checkMonitorConfig(config *common.Config, registrar *pluginsReg, allowWatches bool) error { +func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg, allowWatches bool) error { m, err := newMonitor(config, registrar, nil, nil, allowWatches) if m != nil { m.Stop() // Stop the monitor to free up the ID from uniqueness checks @@ -96,7 +97,7 @@ func (e ErrDuplicateMonitorID) Error() string { // newMonitor Creates a new monitor, without leaking resources in the event of an error. func newMonitor( config *common.Config, - registrar *pluginsReg, + registrar *plugin.PluginsReg, pipelineConnector beat.PipelineConnector, scheduler *scheduler.Scheduler, allowWatches bool, @@ -112,7 +113,7 @@ func newMonitor( // error without freeing monitor resources. m.Stop() must always be called on a non-nil monitor to free resources. func newMonitorUnsafe( config *common.Config, - registrar *pluginsReg, + registrar *plugin.PluginsReg, pipelineConnector beat.PipelineConnector, scheduler *scheduler.Scheduler, allowWatches bool, @@ -120,26 +121,26 @@ func newMonitorUnsafe( // Extract just the Id, Type, and Enabled fields from the config // We'll parse things more precisely later once we know what exact type of // monitor we have - stdFields, err := stdfields.ConfigToStdMonitorFields(config) + standardFields, err := stdfields.ConfigToStdMonitorFields(config) if err != nil { return nil, err } - monitorPlugin, found := registrar.get(stdFields.Type) + pluginFactory, found := registrar.Get(standardFields.Type) if !found { - return nil, fmt.Errorf("monitor type %v does not exist, valid types are %v", stdFields.Type, registrar.monitorNames()) + return nil, fmt.Errorf("monitor type %v does not exist, valid types are %v", standardFields.Type, registrar.MonitorNames()) } m := &Monitor{ - stdFields: stdFields, - pluginName: monitorPlugin.name, + stdFields: standardFields, + pluginName: pluginFactory.Name, scheduler: scheduler, configuredJobs: []*configuredJob{}, pipelineConnector: pipelineConnector, watchPollTasks: []*configuredJob{}, internalsMtx: sync.Mutex{}, config: config, - stats: monitorPlugin.stats, + stats: pluginFactory.Stats, } if m.stdFields.ID != "" { @@ -156,9 +157,10 @@ func newMonitorUnsafe( m.stdFields.ID = fmt.Sprintf("auto-%s-%#X", m.stdFields.Type, hash) } - rawJobs, endpoints, err := monitorPlugin.create(config) - wrappedJobs := wrappers.WrapCommon(rawJobs, m.stdFields) - m.endpoints = endpoints + p, err := pluginFactory.Create(config) + m.close = p.Close + wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields) + m.endpoints = p.Endpoints if err != nil { return m, fmt.Errorf("job err %v", err) @@ -169,7 +171,7 @@ func newMonitorUnsafe( return m, err } - err = m.makeWatchTasks(monitorPlugin) + err = m.makeWatchTasks(pluginFactory) if err != nil { return m, err } @@ -225,7 +227,7 @@ func (m *Monitor) makeTasks(config *common.Config, jobs []jobs.Job) ([]*configur return mTasks, nil } -func (m *Monitor) makeWatchTasks(monitorPlugin pluginBuilder) error { +func (m *Monitor) makeWatchTasks(pluginFactory plugin.PluginFactory) error { watchCfg := watcher.DefaultWatchConfig err := m.config.Unpack(&watchCfg) if err != nil { @@ -257,13 +259,14 @@ func (m *Monitor) makeWatchTasks(monitorPlugin pluginBuilder) error { return } - watchJobs, endpoints, err := monitorPlugin.create(merged) - m.endpoints = endpoints + p, err := pluginFactory.Create(merged) + m.close = p.Close + m.endpoints = p.Endpoints if err != nil { logp.Err("Could not create job from watch file: %v", err) } - watchTasks, err := m.makeTasks(merged, watchJobs) + watchTasks, err := m.makeTasks(merged, p.Jobs) if err != nil { logp.Err("Could not make configuredJob for config: %v", err) return @@ -305,7 +308,7 @@ func (m *Monitor) Start() { t.Start() } - m.stats.startMonitor(int64(m.endpoints)) + m.stats.StartMonitor(int64(m.endpoints)) } // Stop stops the Monitor's execution in its configured scheduler. @@ -323,7 +326,14 @@ func (m *Monitor) Stop() { t.Stop() } - m.stats.stopMonitor(int64(m.endpoints)) + if m.close != nil { + err := m.close() + if err != nil { + logp.Error(fmt.Errorf("error closing monitor %s: %w", m.String(), err)) + } + } + + m.stats.StopMonitor(int64(m.endpoints)) } func (m *Monitor) freeID() { diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index 341839e382d..b0d02f819f6 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -21,19 +21,17 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/go-lookslike/testslike" - "github.com/elastic/beats/v7/heartbeat/scheduler" + "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/go-lookslike/testslike" ) func TestMonitor(t *testing.T) { serverMonConf := mockPluginConf(t, "", "@every 1ms", "http://example.net") - reg := mockPluginsReg() + reg, built, closed := mockPluginsReg() pipelineConnector := &MockPipelineConnector{} sched := scheduler.New(1, monitoring.NewRegistry()) @@ -57,7 +55,6 @@ func TestMonitor(t *testing.T) { if count >= 1 { success = true - mon.Stop() pcClient.Close() for _, event := range pcClient.Publishes() { @@ -74,14 +71,17 @@ func TestMonitor(t *testing.T) { t.Fatalf("No publishes detected!") } + assert.Equal(t, 1, built.Load()) mon.Stop() + + assert.Equal(t, 1, closed.Load()) assert.Equal(t, true, pcClient.closed) } func TestDuplicateMonitorIDs(t *testing.T) { serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net") badConf := mockBadPluginConf(t, "custom", "@every 1ms") - reg := mockPluginsReg() + reg, built, closed := mockPluginsReg() pipelineConnector := &MockPipelineConnector{} sched := scheduler.New(1, monitoring.NewRegistry()) @@ -102,15 +102,22 @@ func TestDuplicateMonitorIDs(t *testing.T) { require.NoError(t, m1Err) _, m2Err := makeTestMon() require.Error(t, m2Err) - m1.Stop() - _, m3Err := makeTestMon() + m3, m3Err := makeTestMon() + require.NoError(t, m3Err) + m3.Stop() + + // We count 3 because built doesn't count successful builds, + // just attempted creations of monitors + require.Equal(t, 3, built.Load()) + // Only one stops because the others errored on create + require.Equal(t, 2, closed.Load()) require.NoError(t, m3Err) } func TestCheckInvalidConfig(t *testing.T) { serverMonConf := mockInvalidPluginConf(t) - reg := mockPluginsReg() + reg, built, closed := mockPluginsReg() pipelineConnector := &MockPipelineConnector{} sched := scheduler.New(1, monitoring.NewRegistry()) @@ -122,5 +129,9 @@ func TestCheckInvalidConfig(t *testing.T) { // This could change if we decide the contract for newMonitor should always return a monitor require.Nil(t, m, "For this test to work we need a nil value for the monitor.") + // These counters are both zero since this fails at config parse time + require.Equal(t, 0, built.Load()) + require.Equal(t, 0, closed.Load()) + require.Error(t, checkMonitorConfig(serverMonConf, reg, false)) } diff --git a/heartbeat/monitors/plugin.go b/heartbeat/monitors/plugin/plugin.go similarity index 58% rename from heartbeat/monitors/plugin.go rename to heartbeat/monitors/plugin/plugin.go index a75f8990477..73335ca5e29 100644 --- a/heartbeat/monitors/plugin.go +++ b/heartbeat/monitors/plugin/plugin.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package monitors +package plugin import ( "errors" @@ -29,11 +29,19 @@ import ( "github.com/elastic/beats/v7/libbeat/plugin" ) -type pluginBuilder struct { - name string - typ Type - builder PluginBuilder - stats registryRecorder +type PluginFactory struct { + Name string + Aliases []string + Builder PluginFactoryCreate + Stats RegistryRecorder +} + +type PluginFactoryCreate func(string, *common.Config) (p Plugin, err error) + +type Plugin struct { + Jobs []jobs.Job + Close func() error + Endpoints int } var pluginKey = "heartbeat.monitor" @@ -41,13 +49,13 @@ var pluginKey = "heartbeat.monitor" // stateGlobalRecorder records statistics across all plugin types var stateGlobalRecorder = newRootGaugeRecorder(hbregistry.TelemetryRegistry) -func statsForPlugin(pluginName string) registryRecorder { - return multiRegistryRecorder{ - recorders: []registryRecorder{ +func statsForPlugin(pluginName string) RegistryRecorder { + return MultiRegistryRecorder{ + recorders: []RegistryRecorder{ // state (telemetry) newPluginGaugeRecorder(pluginName, hbregistry.TelemetryRegistry), // Record global monitors / endpoints count - newPluginCountersRecorder(pluginName, hbregistry.StatsRegistry), + NewPluginCountersRecorder(pluginName, hbregistry.StatsRegistry), // When stats for this plugin are updated, update the global stats as well stateGlobalRecorder, }, @@ -56,20 +64,16 @@ func statsForPlugin(pluginName string) registryRecorder { func init() { plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) error { - p, ok := ifc.(pluginBuilder) + p, ok := ifc.(PluginFactory) if !ok { return errors.New("plugin does not match monitor plugin type") } - stats := statsForPlugin(p.name) - return globalPluginsReg.register(pluginBuilder{p.name, p.typ, p.builder, stats}) + stats := statsForPlugin(p.Name) + return GlobalPluginsReg.Register(PluginFactory{p.Name, p.Aliases, p.Builder, stats}) }) } -// PluginBuilder is the signature of functions used to build active -// monitorStarts -type PluginBuilder func(string, *common.Config) (jobs []jobs.Job, endpoints int, err error) - // Type represents whether a plugin is active or passive. type Type uint8 @@ -81,58 +85,64 @@ const ( ) // globalPluginsReg maintains the canonical list of valid Heartbeat monitorStarts at runtime. -var globalPluginsReg = newPluginsReg() +var GlobalPluginsReg = NewPluginsReg() -type pluginsReg struct { - monitors map[string]pluginBuilder +type PluginsReg struct { + monitors map[string]PluginFactory } -func newPluginsReg() *pluginsReg { - return &pluginsReg{ - monitors: map[string]pluginBuilder{}, +func NewPluginsReg() *PluginsReg { + return &PluginsReg{ + monitors: map[string]PluginFactory{}, } } -// RegisterActive registers a new active (as opposed to passive) monitor. -func RegisterActive(name string, builder PluginBuilder) { +// Register registers a new active (as opposed to passive) monitor. +func Register(name string, builder PluginFactoryCreate, aliases ...string) { stats := statsForPlugin(name) - if err := globalPluginsReg.add(pluginBuilder{name, ActiveMonitor, builder, stats}); err != nil { + if err := GlobalPluginsReg.Add(PluginFactory{name, aliases, builder, stats}); err != nil { panic(err) } } // ErrPluginAlreadyExists is returned when there is an attempt to register two plugins // with the same pluginName. -type ErrPluginAlreadyExists pluginBuilder +type ErrPluginAlreadyExists PluginFactory func (m ErrPluginAlreadyExists) Error() string { - return fmt.Sprintf("monitor plugin '%s' already exists", m.typ) + return fmt.Sprintf("monitor plugin named '%s' with Aliases %v already exists", m.Name, m.Aliases) } -func (r *pluginsReg) add(plugin pluginBuilder) error { - if _, exists := r.monitors[plugin.name]; exists { +func (r *PluginsReg) Add(plugin PluginFactory) error { + if _, exists := r.monitors[plugin.Name]; exists { return ErrPluginAlreadyExists(plugin) } - r.monitors[plugin.name] = plugin + r.monitors[plugin.Name] = plugin + for _, alias := range plugin.Aliases { + if _, exists := r.monitors[alias]; exists { + return ErrPluginAlreadyExists(plugin) + } + r.monitors[alias] = plugin + } return nil } -func (r *pluginsReg) register(plugin pluginBuilder) error { - if _, found := r.monitors[plugin.name]; found { - return fmt.Errorf("monitor type %v already exists", plugin.typ) +func (r *PluginsReg) Register(plugin PluginFactory) error { + if _, found := r.monitors[plugin.Name]; found { + return fmt.Errorf("monitor type %v already exists", plugin.Name) } - r.monitors[plugin.name] = plugin + r.monitors[plugin.Name] = plugin return nil } -func (r *pluginsReg) get(name string) (pluginBuilder, bool) { +func (r *PluginsReg) Get(name string) (PluginFactory, bool) { e, found := r.monitors[name] return e, found } -func (r *pluginsReg) String() string { +func (r *PluginsReg) String() string { var monitors []string for m := range r.monitors { monitors = append(monitors, m) @@ -142,7 +152,7 @@ func (r *pluginsReg) String() string { return fmt.Sprintf("globalPluginsReg, monitor: %v", strings.Join(monitors, ", ")) } -func (r *pluginsReg) monitorNames() []string { +func (r *PluginsReg) MonitorNames() []string { names := make([]string, 0, len(r.monitors)) for k := range r.monitors { names = append(names, k) @@ -150,17 +160,6 @@ func (r *pluginsReg) monitorNames() []string { return names } -func (e *pluginBuilder) create(cfg *common.Config) (jobs []jobs.Job, endpoints int, err error) { - return e.builder(e.name, cfg) -} - -func (t Type) String() string { - switch t { - case ActiveMonitor: - return "active" - case PassiveMonitor: - return "passive" - default: - return "unknown type" - } +func (e *PluginFactory) Create(cfg *common.Config) (p Plugin, err error) { + return e.Builder(e.Name, cfg) } diff --git a/heartbeat/monitors/regrecord.go b/heartbeat/monitors/plugin/regrecord.go similarity index 70% rename from heartbeat/monitors/regrecord.go rename to heartbeat/monitors/plugin/regrecord.go index a7c630deaa1..49d327c812b 100644 --- a/heartbeat/monitors/regrecord.go +++ b/heartbeat/monitors/plugin/regrecord.go @@ -15,46 +15,46 @@ // specific language governing permissions and limitations // under the License. -package monitors +package plugin import ( "github.com/elastic/beats/v7/libbeat/monitoring" ) -type registryRecorder interface { - startMonitor(endpoints int64) - stopMonitor(endpoints int64) +type RegistryRecorder interface { + StartMonitor(endpoints int64) + StopMonitor(endpoints int64) } -// multiRegistryRecorder composes multiple statsRecorders. -type multiRegistryRecorder struct { - recorders []registryRecorder +// MultiRegistryRecorder composes multiple statsRecorders. +type MultiRegistryRecorder struct { + recorders []RegistryRecorder } -func (mr multiRegistryRecorder) startMonitor(endpoints int64) { +func (mr MultiRegistryRecorder) StartMonitor(endpoints int64) { for _, recorder := range mr.recorders { - recorder.startMonitor(endpoints) + recorder.StartMonitor(endpoints) } } -func (mr multiRegistryRecorder) stopMonitor(endpoints int64) { +func (mr MultiRegistryRecorder) StopMonitor(endpoints int64) { for _, recorder := range mr.recorders { - recorder.stopMonitor(endpoints) + recorder.StopMonitor(endpoints) } } // countersRecorder is used to record start/stop events for a single monitor/plugin // to a single registry as counters. -type countersRecorder struct { +type CountersRecorder struct { monitorStarts *monitoring.Int monitorStops *monitoring.Int endpointStarts *monitoring.Int endpointStops *monitoring.Int } -func newPluginCountersRecorder(pluginName string, rootRegistry *monitoring.Registry) registryRecorder { +func NewPluginCountersRecorder(pluginName string, rootRegistry *monitoring.Registry) RegistryRecorder { pluginRegistry := rootRegistry.NewRegistry(pluginName) - return countersRecorder{ + return CountersRecorder{ monitoring.NewInt(pluginRegistry, "monitor_starts"), monitoring.NewInt(pluginRegistry, "monitor_stops"), monitoring.NewInt(pluginRegistry, "endpoint_starts"), @@ -62,12 +62,12 @@ func newPluginCountersRecorder(pluginName string, rootRegistry *monitoring.Regis } } -func (r countersRecorder) startMonitor(endpoints int64) { +func (r CountersRecorder) StartMonitor(endpoints int64) { r.monitorStarts.Inc() r.endpointStarts.Add(endpoints) } -func (r countersRecorder) stopMonitor(endpoints int64) { +func (r CountersRecorder) StopMonitor(endpoints int64) { r.monitorStops.Inc() r.endpointStops.Add(endpoints) } @@ -79,24 +79,24 @@ type gaugeRecorder struct { endpoints *monitoring.Int } -func newRootGaugeRecorder(r *monitoring.Registry) registryRecorder { +func newRootGaugeRecorder(r *monitoring.Registry) RegistryRecorder { return gaugeRecorder{ monitoring.NewInt(r, "monitors"), monitoring.NewInt(r, "endpoints"), } } -func newPluginGaugeRecorder(pluginName string, rootRegistry *monitoring.Registry) registryRecorder { +func newPluginGaugeRecorder(pluginName string, rootRegistry *monitoring.Registry) RegistryRecorder { pluginRegistry := rootRegistry.NewRegistry(pluginName) return newRootGaugeRecorder(pluginRegistry) } -func (r gaugeRecorder) startMonitor(endpoints int64) { +func (r gaugeRecorder) StartMonitor(endpoints int64) { r.monitors.Inc() r.endpoints.Add(endpoints) } -func (r gaugeRecorder) stopMonitor(endpoints int64) { +func (r gaugeRecorder) StopMonitor(endpoints int64) { r.monitors.Dec() r.endpoints.Sub(endpoints) } diff --git a/heartbeat/monitors/wrappers/monitors.go b/heartbeat/monitors/wrappers/monitors.go index 3b5965ac01c..dca3a8b70de 100644 --- a/heartbeat/monitors/wrappers/monitors.go +++ b/heartbeat/monitors/wrappers/monitors.go @@ -38,63 +38,91 @@ import ( // WrapCommon applies the common wrappers that all monitor jobs get. func WrapCommon(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job { - jobWrappers := []jobs.JobWrapper{ - addMonitorMeta(stdMonFields, len(js) > 1), - addMonitorStatus(stdMonFields.Type), - } - - if stdMonFields.Type != "browser" { - jobWrappers = append(jobWrappers, addMonitorDuration) + if stdMonFields.Type == "browser" { + return WrapBrowser(js, stdMonFields) + } else { + return WrapLightweight(js, stdMonFields) } +} +// WrapLightweight applies to http/tcp/icmp, everything but journeys involving node +func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job { return jobs.WrapAllSeparately( jobs.WrapAll( js, - jobWrappers..., + addMonitorMeta(stdMonFields, len(js) > 1), + addMonitorStatus(stdMonFields.Type), + addMonitorDuration, ), func() jobs.JobWrapper { return makeAddSummary(stdMonFields.Type) }) } +// WrapBrowser is pretty minimal in terms of fields added. The browser monitor +// type handles most of the fields directly, since it runs multiple jobs in a single +// run it needs to take this task on in a unique way. +func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job { + return jobs.WrapAll( + js, + addMonitorMeta(stdMonFields, len(js) > 1), + addMonitorStatus(stdMonFields.Type), + ) +} + // addMonitorMeta adds the id, name, and type fields to the monitor. func addMonitorMeta(stdMonFields stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper { return func(job jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { - started := time.Now() cont, e := job(event) - thisID := stdMonFields.ID + addMonitorMetaFields(event, time.Now(), stdMonFields, isMulti) + return cont, e + } + } +} - if isMulti { - url, err := event.GetValue("url.full") - if err != nil { - logp.Error(errors.Wrap(err, "Mandatory url.full key missing!")) - url = "n/a" - } - urlHash, _ := hashstructure.Hash(url, nil) - thisID = fmt.Sprintf("%s-%x", stdMonFields.ID, urlHash) - } +func addMonitorMetaFields(event *beat.Event, started time.Time, sf stdfields.StdMonitorFields, isMulti bool) { + id := sf.ID + name := sf.Name - fieldsToMerge := common.MapStr{ - "monitor": common.MapStr{ - "id": thisID, - "name": stdMonFields.Name, - "type": stdMonFields.Type, - "timespan": timespan(started, stdMonFields.Schedule, stdMonFields.Timeout), - }, - } + // If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the + // unique URLs to create unique suffixes for the monitor. + if isMulti { + url, err := event.GetValue("url.full") + if err != nil { + logp.Error(errors.Wrap(err, "Mandatory url.full key missing!")) + url = "n/a" + } + urlHash, _ := hashstructure.Hash(url, nil) + id = fmt.Sprintf("%s-%x", sf.ID, urlHash) + } - if stdMonFields.Service.Name != "" { - fieldsToMerge["service"] = common.MapStr{ - "name": stdMonFields.Service.Name, - } - } + // Allow jobs to override the ID, useful for browser suites + // which do this logic on their own + if v, _ := event.GetValue("monitor.id"); v != nil { + id = fmt.Sprintf("%s-%s", sf.ID, v.(string)) + } + if v, _ := event.GetValue("monitor.name"); v != nil { + name = fmt.Sprintf("%s - %s", sf.Name, v.(string)) + } - eventext.MergeEventFields(event, fieldsToMerge) + fieldsToMerge := common.MapStr{ + "monitor": common.MapStr{ + "id": id, + "name": name, + "type": sf.Type, + "timespan": timespan(started, sf.Schedule, sf.Timeout), + }, + } - return cont, e + // Add service.name for APM interop + if sf.Service.Name != "" { + fieldsToMerge["service"] = common.MapStr{ + "name": sf.Service.Name, } } + + eventext.MergeEventFields(event, fieldsToMerge) } func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration) common.MapStr { @@ -120,13 +148,6 @@ func addMonitorStatus(monitorType string) jobs.JobWrapper { return func(event *beat.Event) ([]jobs.Job, error) { cont, err := origJob(event) - // Non-summary browser events have no status associated - if monitorType == "browser" { - if t, _ := event.GetValue("synthetics.type"); t != "heartbeat/summary" { - return cont, nil - } - } - fields := common.MapStr{ "monitor": common.MapStr{ "status": look.Status(err), @@ -167,6 +188,7 @@ func makeAddSummary(monitorType string) jobs.JobWrapper { // state struct here. state := struct { mtx sync.Mutex + monitorId string remaining uint16 up uint16 down uint16 @@ -208,7 +230,6 @@ func makeAddSummary(monitorType string) jobs.JobWrapper { } } - // No error check needed here event.PutValue("monitor.check_group", state.checkGroup) // Adjust the total remaining to account for new continuations @@ -220,15 +241,7 @@ func makeAddSummary(monitorType string) jobs.JobWrapper { if state.remaining == 0 { up := state.up down := state.down - if monitorType == "browser" { - if eventStatus == "down" { - up = 0 - down = 1 - } else { - up = 1 - down = 0 - } - } + eventext.MergeEventFields(event, common.MapStr{ "summary": common.MapStr{ "up": up, diff --git a/heartbeat/monitors/wrappers/monitors_test.go b/heartbeat/monitors/wrappers/monitors_test.go index d8ff497225e..88e6fd76997 100644 --- a/heartbeat/monitors/wrappers/monitors_test.go +++ b/heartbeat/monitors/wrappers/monitors_test.go @@ -56,6 +56,14 @@ var testMonFields = stdfields.StdMonitorFields{ Timeout: 1, } +var testBrowserMonFields = stdfields.StdMonitorFields{ + ID: "myid", + Name: "myname", + Type: "browser", + Schedule: schedule.MustParse("@every 1s"), + Timeout: 1, +} + func testCommonWrap(t *testing.T, tt testDef) { t.Run(tt.name, func(t *testing.T) { wrapped := WrapCommon(tt.jobs, tt.stdFields) @@ -387,3 +395,105 @@ func TestTimespan(t *testing.T) { }) } } + +func makeInlineBrowserJob(t *testing.T, u string) jobs.Job { + parsed, err := url.Parse(u) + require.NoError(t, err) + return func(event *beat.Event) (i []jobs.Job, e error) { + eventext.MergeEventFields(event, common.MapStr{ + "url": URLFields(parsed), + "monitor": common.MapStr{ + "check_group": "inline-check-group", + }, + }) + return nil, nil + } +} + +// Inline browser jobs function very similarly to lightweight jobs +// in that they don't override the ID. +// They do not, however, get a summary field added, nor duration. +func TestInlineBrowserJob(t *testing.T) { + fields := testBrowserMonFields + testCommonWrap(t, testDef{ + "simple", + fields, + []jobs.Job{makeInlineBrowserJob(t, "http://foo.com")}, + []validator.Validator{ + lookslike.Strict( + lookslike.Compose( + urlValidator(t, "http://foo.com"), + lookslike.MustCompile(map[string]interface{}{ + "monitor": map[string]interface{}{ + "id": testMonFields.ID, + "name": testMonFields.Name, + "type": fields.Type, + "status": "up", + "check_group": "inline-check-group", + }, + }), + hbtestllext.MonitorTimespanValidator, + ), + ), + }, + nil, + }) +} + +var suiteBrowserJobValues = struct { + id string + name string + checkGroup string +}{ + id: "journey_1", + name: "Journey 1", + checkGroup: "journey-1-check-group", +} + +func makeSuiteBrowserJob(t *testing.T, u string) jobs.Job { + parsed, err := url.Parse(u) + require.NoError(t, err) + return func(event *beat.Event) (i []jobs.Job, e error) { + eventext.MergeEventFields(event, common.MapStr{ + "url": URLFields(parsed), + "monitor": common.MapStr{ + "id": suiteBrowserJobValues.id, + "name": suiteBrowserJobValues.name, + "check_group": suiteBrowserJobValues.checkGroup, + }, + }) + return nil, nil + } +} + +func TestSuiteBrowserJob(t *testing.T) { + fields := testBrowserMonFields + urlStr := "http://foo.com" + urlU, _ := url.Parse(urlStr) + testCommonWrap(t, testDef{ + "simple", + fields, + []jobs.Job{makeSuiteBrowserJob(t, urlStr)}, + []validator.Validator{ + lookslike.Compose( + urlValidator(t, urlStr), + lookslike.Strict( + lookslike.MustCompile(map[string]interface{}{ + "monitor": map[string]interface{}{ + "id": fmt.Sprintf("%s-%s", testMonFields.ID, suiteBrowserJobValues.id), + "name": fmt.Sprintf("%s - %s", testMonFields.Name, suiteBrowserJobValues.name), + "type": fields.Type, + "check_group": suiteBrowserJobValues.checkGroup, + "status": "up", + "timespan": common.MapStr{ + "gte": hbtestllext.IsTime, + "lt": hbtestllext.IsTime, + }, + }, + "url": URLFields(urlU), + }), + ), + )}, + nil, + }) +} diff --git a/x-pack/heartbeat/monitors/browser/browser.go b/x-pack/heartbeat/monitors/browser/browser.go index f3cb1e6483e..76e02f1ff28 100644 --- a/x-pack/heartbeat/monitors/browser/browser.go +++ b/x-pack/heartbeat/monitors/browser/browser.go @@ -11,28 +11,28 @@ import ( "os/user" "sync" - "github.com/elastic/beats/v7/heartbeat/monitors" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser/synthexec" ) func init() { - monitors.RegisterActive("browser", create) - monitors.RegisterActive("synthetic/browser", create) + plugin.Register("browser", create, "synthetic", "synthetics/synthetic") } var showExperimentalOnce = sync.Once{} var NotSyntheticsCapableError = fmt.Errorf("synthetic monitors cannot be created outside the official elastic docker image") -func create(name string, cfg *common.Config) (js []jobs.Job, endpoints int, err error) { +func create(name string, cfg *common.Config) (p plugin.Plugin, err error) { // We don't want users running synthetics in environments that don't have the required GUI libraries etc, so we check // this flag. When we're ready to support the many possible configurations of systems outside the docker environment // we can remove this check. if os.Getenv("ELASTIC_SYNTHETICS_CAPABLE") != "true" { - return nil, 0, NotSyntheticsCapableError + return plugin.Plugin{}, NotSyntheticsCapableError } showExperimentalOnce.Do(func() { @@ -41,25 +41,37 @@ func create(name string, cfg *common.Config) (js []jobs.Job, endpoints int, err curUser, err := user.Current() if err != nil { - return nil, 0, fmt.Errorf("could not determine current user for script monitor %w: ", err) + return plugin.Plugin{}, fmt.Errorf("could not determine current user for script monitor %w: ", err) } if curUser.Uid == "0" { - return nil, 0, fmt.Errorf("script monitors cannot be run as root! Current UID is %s", curUser.Uid) + return plugin.Plugin{}, fmt.Errorf("script monitors cannot be run as root! Current UID is %s", curUser.Uid) } - config := defaultConfig - if err := cfg.Unpack(&config); err != nil { - return nil, 0, err + ss, err := NewSuite(cfg) + if err != nil { + return plugin.Plugin{}, err } var j jobs.Job - if config.Path != "" { - j, err = synthexec.SuiteJob(context.TODO(), config.Path, config.JourneyName, config.Params) - if err != nil { - return nil, 0, err - } + if src, ok := ss.InlineSource(); ok { + j = synthexec.InlineJourneyJob(context.TODO(), src, ss.Params()) } else { - j = synthexec.InlineJourneyJob(context.TODO(), config.Script, config.Params) + j = func(event *beat.Event) ([]jobs.Job, error) { + err := ss.Fetch() + if err != nil { + return nil, fmt.Errorf("could not fetch for suite job: %w", err) + } + sj, err := synthexec.SuiteJob(context.TODO(), ss.Workdir(), ss.Params()) + if err != nil { + return nil, err + } + return sj(event) + } } - return []jobs.Job{j}, 1, nil + + return plugin.Plugin{ + Jobs: []jobs.Job{j}, + Close: ss.Close, + Endpoints: 1, + }, nil } diff --git a/x-pack/heartbeat/monitors/browser/config.go b/x-pack/heartbeat/monitors/browser/config.go index 9c998d7d036..0cbb699da88 100644 --- a/x-pack/heartbeat/monitors/browser/config.go +++ b/x-pack/heartbeat/monitors/browser/config.go @@ -8,20 +8,30 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser/source" ) type Config struct { - Path string `config:"path"` - Script string `config:"script"` - Params common.MapStr `config:"script_params"` - JourneyName string `config:"journey_name"` + Schedule string `config:"schedule"` + Params map[string]interface{} `config:"params"` + RawConfig *common.Config + Source *source.Source `config:"source"` + // Name is optional for lightweight checks but required for browsers + Name string `config:"name"` + // Id is optional for lightweight checks but required for browsers + Id string `config:"id"` } +var ErrNameRequired = fmt.Errorf("config 'name' must be specified for this monitor") +var ErrIdRequired = fmt.Errorf("config 'id' must be specified for this monitor") + func (c *Config) Validate() error { - if c.Script != "" && c.Path != "" { - return fmt.Errorf("both path and script specified! Only one of these options may be present!") + if c.Name == "" { + return ErrNameRequired + } + if c.Id == "" { + return ErrIdRequired } + return nil } - -var defaultConfig = Config{} diff --git a/x-pack/heartbeat/monitors/browser/source/fixtures/todos/add-remove.journey.ts b/x-pack/heartbeat/monitors/browser/source/fixtures/todos/add-remove.journey.ts new file mode 100644 index 00000000000..bb80e031cec --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/source/fixtures/todos/add-remove.journey.ts @@ -0,0 +1,38 @@ +import { journey } from '@elastic/synthetics'; +import { + loadAppStep, + addTaskStep, + assertTaskListSizeStep, + checkForTaskStep, + destroyTaskStep, +} from './helpers'; + +journey('basic addition and completion of single task', async ({ page }) => { + const testText = "Don't put salt in your eyes"; + + loadAppStep(page); + addTaskStep(page, testText); + assertTaskListSizeStep(page, 1); + checkForTaskStep(page, testText); + destroyTaskStep(page, testText); + assertTaskListSizeStep(page, 0); +}); + +journey('adding and removing a few tasks', async ({ page }) => { + const testTasks = ['Task 1', 'Task 2', 'Task 3']; + + loadAppStep(page); + testTasks.forEach(t => { + addTaskStep(page, t); + }); + + assertTaskListSizeStep(page, 3); + + // remove the middle task and check that it worked + destroyTaskStep(page, testTasks[1]); + assertTaskListSizeStep(page, 2); + + // add a new task and check it exists + addTaskStep(page, 'Task 4'); + assertTaskListSizeStep(page, 3); +}); diff --git a/x-pack/heartbeat/monitors/browser/source/fixtures/todos/basics.journey.ts b/x-pack/heartbeat/monitors/browser/source/fixtures/todos/basics.journey.ts new file mode 100644 index 00000000000..f8f8f522635 --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/source/fixtures/todos/basics.journey.ts @@ -0,0 +1,30 @@ +import { journey, step } from '@elastic/synthetics'; +import { deepStrictEqual } from 'assert'; +import { join } from 'path'; + +journey('check that title is present', async ({ page }) => { + step('go to app', async () => { + const path = 'file://' + join(__dirname, 'app', 'index.html'); + await page.goto(path); + }); + + step('check title is present', async () => { + const header = await page.$('h1'); + deepStrictEqual(await header.textContent(), 'todos'); + }); +}); + +journey('check that input placeholder is correct', async ({ page }) => { + step('go to app', async () => { + const path = 'file://' + join(__dirname, 'app', 'index.html'); + await page.goto(path); + }); + + step('check title is present', async () => { + const input = await page.$('input.new-todo'); + deepStrictEqual( + await input.getAttribute('placeholder'), + 'What nneeds to be done?' + ); + }); +}); diff --git a/x-pack/heartbeat/monitors/browser/source/fixtures/todos/helpers.ts b/x-pack/heartbeat/monitors/browser/source/fixtures/todos/helpers.ts new file mode 100644 index 00000000000..bb69aeddbad --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/source/fixtures/todos/helpers.ts @@ -0,0 +1,54 @@ +import { step } from '@elastic/synthetics'; +import * as assert from 'assert'; +import { join } from 'path'; +import { Page } from 'playwright-core'; + +export const loadAppStep = (page: Page) => { + step('go to app', async () => { + const path = 'file://' + join(__dirname, 'app', 'index.html'); + await page.goto(path); + }); +}; + +export const addTaskStep = (page: Page, task: string) => { + step(`add task ${task}`, async () => { + const input = await page.$('input.new-todo'); + await input.type(task); + await input.press('Enter'); + }); +}; + +const todosSelector = 'ul.todo-list li.todo'; + +export const findTask = async (page: Page, task: string) => { + return await page.waitForSelector(`${todosSelector} >> text="${task}"`); +}; + +export const assertTaskListSizeStep = async (page: Page, size: number) => { + step(`check that task list has exactly ${size} elements`, async () => { + assert.deepEqual((await page.$$(todosSelector)).length, size); + }); +}; + +export const checkForTaskStep = async (page: Page, task: string) => { + step(`check for task '${task}' in list`, async () => { + return findTask(page, task); + }); +}; + +export const destroyTaskStep = async (page: Page, task: string) => { + step(`destroy task '${task}'`, async () => { + const label = await findTask(page, task); + // xpath indexes arrays starting at 1!!! Easy to forget! + const li = await label.$('xpath=ancestor::li[1]'); + const destroyButton = await li.$('button'); + + // The destroy button is not visible until hovered. Setup a click test which + // will wait up to 30s for the button to be visible. + const clickFuture = destroyButton.click(); + // now hover, making the destroy button clickable + li.hover(); + // now we are done + await clickFuture; + }); +}; diff --git a/x-pack/heartbeat/monitors/browser/source/fixtures/todos/node_modules/test.json b/x-pack/heartbeat/monitors/browser/source/fixtures/todos/node_modules/test.json new file mode 100644 index 00000000000..e69de29bb2d diff --git a/x-pack/heartbeat/monitors/browser/source/fixtures/todos/package.json b/x-pack/heartbeat/monitors/browser/source/fixtures/todos/package.json new file mode 100644 index 00000000000..5972cf95b98 --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/source/fixtures/todos/package.json @@ -0,0 +1,10 @@ +{ + "name": "todos", + "private": true, + "description": "This suite tests the examples that ship with the open source Vue.js project.", + "scripts": {}, + "dependencies": { + "@elastic/synthetics": "*", + "playwright-core": "=1.6.2" + } +} diff --git a/x-pack/heartbeat/monitors/browser/source/inline.go b/x-pack/heartbeat/monitors/browser/source/inline.go new file mode 100644 index 00000000000..d3ef2c452e5 --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/source/inline.go @@ -0,0 +1,35 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package source + +import ( + "fmt" + "regexp" +) + +type InlineSource struct { + Script string `config:"script"` + BaseSource +} + +func (s *InlineSource) Validate() error { + if !regexp.MustCompile("\\S").MatchString(s.Script) { + return fmt.Errorf("no 'script' value specified for inline source") + } + + return nil +} + +func (s *InlineSource) Fetch() (err error) { + return nil +} + +func (s *InlineSource) Workdir() string { + return "" +} + +func (s *InlineSource) Close() error { + return nil +} diff --git a/x-pack/heartbeat/monitors/browser/source/local.go b/x-pack/heartbeat/monitors/browser/source/local.go new file mode 100644 index 00000000000..61ec0a1d255 --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/source/local.go @@ -0,0 +1,142 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package source + +import ( + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + "path/filepath" + + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/otiai10/copy" +) + +type LocalSource struct { + OrigPath string `config:"path"` + workingPath string + BaseSource +} + +var ErrNoPath = fmt.Errorf("local source defined with no path specified") + +func ErrInvalidPath(path string) error { + return fmt.Errorf("local source has invalid path '%s'", path) +} + +func (l *LocalSource) Validate() error { + if l.OrigPath == "" { + return ErrNoPath + } + + s, err := os.Stat(l.OrigPath) + base := ErrInvalidPath(l.OrigPath) + if err != nil { + return fmt.Errorf("%s: %w", base, err) + } + if !s.IsDir() { + return fmt.Errorf("%s: path points to a non-directory", base) + } + + return nil +} + +func (l *LocalSource) Fetch() (err error) { + if l.workingPath != "" { + return nil + } + l.workingPath, err = ioutil.TempDir("/tmp", "elastic-synthetics-") + if err != nil { + return fmt.Errorf("could not create tmp dir: %w", err) + } + defer func() { + if err != nil { + err := l.Close() // cleanup the dir if this function returns an err + if err != nil { + logp.Warn("could not cleanup dir: %s", err) + } + } + }() + + err = copy.Copy(l.OrigPath, l.workingPath) + if err != nil { + return fmt.Errorf("could not copy suite: %w", err) + } + + dir, err := getAbsoluteSuiteDir(l.workingPath) + if err != nil { + return err + } + + if !Offline() { + err = setupOnlineDir(dir) + return err + } + + return nil +} + +// setupOnlineDir is run in environments with internet access and attempts to make sure the node env +// is setup correctly. +func setupOnlineDir(dir string) (err error) { + // If we're not offline remove the node_modules folder so we can do a fresh install, this minimizes + // issues with dependencies being broken. + modDir := path.Join(dir, "node_modules") + _, statErr := os.Stat(modDir) + if os.IsExist(statErr) { + err := os.RemoveAll(modDir) + if err != nil { + return fmt.Errorf("could not remove node_modules from '%s': %w", dir, err) + } + } + + // Ensure all deps installed + err = runSimpleCommand(exec.Command("npm", "install"), dir) + if err != nil { + return err + } + + return err +} + +func (l *LocalSource) Workdir() string { + return l.workingPath +} + +func (l *LocalSource) Close() error { + if l.workingPath != "" { + return os.RemoveAll(l.workingPath) + } + + return nil +} + +func getAbsoluteSuiteDir(suiteFile string) (string, error) { + absPath, err := filepath.Abs(suiteFile) + if err != nil { + return "", err + } + stat, err := os.Stat(absPath) + if err != nil { + return "", err + } + + if stat.IsDir() { + return suiteFile, nil + } + + return filepath.Dir(suiteFile), nil +} + +func runSimpleCommand(cmd *exec.Cmd, dir string) error { + cmd.Dir = dir + logp.Info("Running %s in %s", cmd, dir) + output, err := cmd.CombinedOutput() + logp.Info("Ran %s got %s", cmd, string(output)) + return err +} diff --git a/x-pack/heartbeat/monitors/browser/source/local_test.go b/x-pack/heartbeat/monitors/browser/source/local_test.go new file mode 100644 index 00000000000..7deeb7bbf25 --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/source/local_test.go @@ -0,0 +1,70 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package source + +import ( + "os" + "path" + "path/filepath" + "runtime" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLocalSourceValidate(t *testing.T) { + tests := []struct { + name string + OrigPath string + err error + }{ + {"valid", "./", nil}, + {"invalid", "/not/a/path", ErrInvalidPath("/not/a/path")}, + {"nopath", "", ErrNoPath}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &LocalSource{OrigPath: tt.OrigPath} + err := l.Validate() + if tt.err == nil { + require.NoError(t, err) + } else { + require.Regexp(t, tt.err, err) + } + }) + } +} + +func TestLocalSourceLifeCycle(t *testing.T) { + _, filename, _, _ := runtime.Caller(0) + origPath := path.Join(filepath.Dir(filename), "fixtures/todos") + ls := LocalSource{OrigPath: origPath} + require.NoError(t, ls.Validate()) + + // Don't run the NPM commands in unit tests + // We can leave that for E2E tests + GoOffline() + defer GoOnline() + require.NoError(t, ls.Fetch()) + + require.NotEmpty(t, ls.workingPath) + expected := []string{ + "node_modules", + "package.json", + "helpers.ts", + "add-remove.journey.ts", + "basics.journey.ts", + } + for _, file := range expected { + _, err := os.Stat(path.Join(ls.Workdir(), file)) + // assert, not require, because we want to proceed to the close bit + assert.NoError(t, err) + } + + require.NoError(t, ls.Close()) + _, err := os.Stat(ls.Workdir()) + require.True(t, os.IsNotExist(err), "Workdir %s should have been deleted", ls.Workdir()) +} diff --git a/x-pack/heartbeat/monitors/browser/source/offline.go b/x-pack/heartbeat/monitors/browser/source/offline.go new file mode 100644 index 00000000000..0347958a812 --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/source/offline.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package source + +import "os" + +var offlineEnvVar = "ELASTIC_SYNTHETICS_OFFLINE" + +// Offline checks whether sources should act in offline mode, where +// calls to NPM are forbidden. +func Offline() bool { + return os.Getenv(offlineEnvVar) == "true" +} + +// GoOffline switches our current state to offline. Primarily for tests. +func GoOffline() { + e := os.Setenv(offlineEnvVar, "true") + if e != nil { + panic("could not set offline env var!") + } +} + +// GoOffline switches our current state to offline. Primarily for tests. +func GoOnline() { + e := os.Setenv(offlineEnvVar, "false") + if e != nil { + panic("could not set offline env var!") + } +} diff --git a/x-pack/heartbeat/monitors/browser/source/source.go b/x-pack/heartbeat/monitors/browser/source/source.go new file mode 100644 index 00000000000..62c7ce03e9e --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/source/source.go @@ -0,0 +1,48 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package source + +import ( + "fmt" +) + +type Source struct { + Local *LocalSource `config:"local"` + Inline *InlineSource `config:"inline"` + ActiveMemo ISource // cache for selected source +} + +func (s *Source) Active() ISource { + if s.ActiveMemo != nil { + return s.ActiveMemo + } + + if s.Local != nil { + s.ActiveMemo = s.Local + } else if s.Inline != nil { + s.ActiveMemo = s.Inline + } + + return s.ActiveMemo +} + +var ErrInvalidSource = fmt.Errorf("no or unknown source type specified for synthetic monitor") + +func (s *Source) Validate() error { + if s.Active() == nil { + return ErrInvalidSource + } + return nil +} + +type ISource interface { + Fetch() error + Workdir() string + Close() error +} + +type BaseSource struct { + Type string `config:"type"` +} diff --git a/x-pack/heartbeat/monitors/browser/suite_runner.go b/x-pack/heartbeat/monitors/browser/suite_runner.go new file mode 100644 index 00000000000..73b597766ff --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/suite_runner.go @@ -0,0 +1,69 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package browser + +import ( + "context" + "fmt" + + "github.com/elastic/beats/v7/libbeat/common" +) + +type JourneyLister func(ctx context.Context, suitePath string, params common.MapStr) (journeyNames []string, err error) + +var journeyListSingleton JourneyLister + +type SyntheticSuite struct { + rawCfg *common.Config + suiteCfg *Config +} + +func NewSuite(rawCfg *common.Config) (*SyntheticSuite, error) { + ss := &SyntheticSuite{ + rawCfg: rawCfg, + suiteCfg: &Config{}, + } + err := rawCfg.Unpack(ss.suiteCfg) + if err != nil { + return nil, ErrBadConfig(err) + } + + return ss, nil +} + +func ErrBadConfig(err error) error { + return fmt.Errorf("could not parse suite config: %w", err) +} + +func (s *SyntheticSuite) String() string { + panic("implement me") +} + +func (s *SyntheticSuite) Fetch() error { + return s.suiteCfg.Source.Active().Fetch() +} + +func (s *SyntheticSuite) Workdir() string { + return s.suiteCfg.Source.Active().Workdir() +} + +func (s *SyntheticSuite) InlineSource() (string, bool) { + if s.suiteCfg.Source.Inline != nil { + return s.suiteCfg.Source.Inline.Script, true + } + return "", false +} + +func (s *SyntheticSuite) Params() map[string]interface{} { + return s.suiteCfg.Params +} + +func (s *SyntheticSuite) Close() error { + if s.suiteCfg.Source.ActiveMemo != nil { + s.suiteCfg.Source.ActiveMemo.Close() + } + + return nil +} diff --git a/x-pack/heartbeat/monitors/browser/suite_runner_test.go b/x-pack/heartbeat/monitors/browser/suite_runner_test.go new file mode 100644 index 00000000000..9f9448440cc --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/suite_runner_test.go @@ -0,0 +1,116 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package browser + +import ( + "path" + "path/filepath" + "runtime" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser/source" +) + +func TestValidLocal(t *testing.T) { + _, filename, _, _ := runtime.Caller(0) + path := path.Join(filepath.Dir(filename), "source/fixtures/todos") + testParams := map[string]interface{}{ + "key1": "value1", + "key2": "value2", + } + cfg := common.MustNewConfigFrom(common.MapStr{ + "name": "My Name", + "id": "myId", + "params": testParams, + "source": common.MapStr{ + "local": common.MapStr{ + "path": path, + }, + }, + }) + s, e := NewSuite(cfg) + require.NoError(t, e) + require.NotNil(t, s) + _, ok := s.InlineSource() + require.False(t, ok) + + source.GoOffline() + defer source.GoOnline() + require.NoError(t, s.Fetch()) + defer require.NoError(t, s.Close()) + require.Regexp(t, "\\w{1,}", s.Workdir()) + require.Equal(t, testParams, s.Params()) + + e = s.Close() + require.NoError(t, e) +} + +func TestValidInline(t *testing.T) { + script := "a script" + testParams := map[string]interface{}{ + "key1": "value1", + "key2": "value2", + } + cfg := common.MustNewConfigFrom(common.MapStr{ + "name": "My Name", + "id": "myId", + "params": testParams, + "source": common.MapStr{ + "inline": common.MapStr{ + "script": script, + }, + }, + }) + s, e := NewSuite(cfg) + require.NoError(t, e) + require.NotNil(t, s) + sSrc, ok := s.InlineSource() + require.True(t, ok) + require.Equal(t, script, sSrc) + require.Equal(t, "", s.Workdir()) + require.Equal(t, testParams, s.Params()) + + e = s.Close() + require.NoError(t, e) +} + +func TestNameRequired(t *testing.T) { + cfg := common.MustNewConfigFrom(common.MapStr{ + "id": "myId", + "source": common.MapStr{ + "inline": common.MapStr{ + "script": "a script", + }, + }, + }) + _, e := NewSuite(cfg) + require.Regexp(t, ErrNameRequired, e) +} + +func TestIDRequired(t *testing.T) { + cfg := common.MustNewConfigFrom(common.MapStr{ + "name": "My Name", + "source": common.MapStr{ + "inline": common.MapStr{ + "script": "a script", + }, + }, + }) + _, e := NewSuite(cfg) + require.Regexp(t, ErrIdRequired, e) +} + +func TestEmptySource(t *testing.T) { + cfg := common.MustNewConfigFrom(common.MapStr{ + "source": common.MapStr{}, + }) + s, e := NewSuite(cfg) + + require.Regexp(t, ErrBadConfig(source.ErrInvalidSource), e) + require.Nil(t, s) +} diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index 426b5da2bd7..0df01cf278a 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -8,15 +8,33 @@ import ( "fmt" "time" + "github.com/gofrs/uuid" + "github.com/elastic/beats/v7/heartbeat/eventext" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" ) +type enricher func(event *beat.Event, se *SynthEvent) error + +type streamEnricher struct { + je *journeyEnricher +} + +func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent) error { + if e.je == nil || (se != nil && se.Type == "journey/start") { + e.je = newJourneyEnricher() + } + + return e.je.enrich(event, se) +} + // journeyEnricher holds state across received SynthEvents retaining fields // where relevant to properly enrich *beat.Event instances. type journeyEnricher struct { journeyComplete bool + journey *Journey + checkGroup string errorCount int lastError error stepCount int @@ -28,15 +46,32 @@ type journeyEnricher struct { } func newJourneyEnricher() *journeyEnricher { - return &journeyEnricher{} + return &journeyEnricher{ + checkGroup: makeUuid(), + } +} + +func makeUuid() string { + u, err := uuid.NewV1() + if err != nil { + panic("Cannot generate v1 UUID, this should never happen!") + } + return u.String() } func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error { - if se != nil && !se.Timestamp().IsZero() { + if se == nil { + return nil + } + + if !se.Timestamp().IsZero() { event.Timestamp = se.Timestamp() // Record start and end so we can calculate journey duration accurately later switch se.Type { case "journey/start": + je.lastError = nil + je.checkGroup = makeUuid() + je.journey = se.Journey je.start = event.Timestamp case "journey/end": je.end = event.Timestamp @@ -45,9 +80,19 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error { event.Timestamp = time.Now() } - // No more synthEvents? In this case this is the summary event - if se == nil { - return je.createSummary(event) + eventext.MergeEventFields(event, common.MapStr{ + "monitor": common.MapStr{ + "check_group": je.checkGroup, + }, + }) + // Inline jobs have no journey + if je.journey != nil { + eventext.MergeEventFields(event, common.MapStr{ + "monitor": common.MapStr{ + "id": je.journey.Id, + "name": je.journey.Name, + }, + }) } return je.enrichSynthEvent(event, se) @@ -57,6 +102,7 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e switch se.Type { case "journey/end": je.journeyComplete = true + return je.createSummary(event) case "step/end": je.stepCount++ } @@ -82,20 +128,35 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e } func (je *journeyEnricher) createSummary(event *beat.Event) error { + var up, down int + if je.errorCount > 0 { + up = 0 + down = 1 + } else { + up = 1 + down = 0 + } + if je.journeyComplete { eventext.MergeEventFields(event, common.MapStr{ "url": je.urlFields, "synthetics": common.MapStr{ - "type": "heartbeat/summary", + "type": "heartbeat/summary", + "journey": je.journey, }, "monitor": common.MapStr{ "duration": common.MapStr{ "us": int64(je.end.Sub(je.start) / time.Microsecond), }, }, + "summary": common.MapStr{ + "up": up, + "down": down, + }, }) return je.lastError } + return fmt.Errorf("journey did not finish executing, %d steps ran", je.stepCount) } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index 3ea3eb1ec58..cf1cc0dd6cf 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -75,12 +75,12 @@ func TestJourneyEnricher(t *testing.T) { // We need an expectation for each input // plus a final expectation for the summary which comes // on the nil data. - for idx, se := range append(synthEvents, nil) { + for idx, se := range synthEvents { e := &beat.Event{} t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { enrichErr := je.enrich(e, se) - if se != nil { + if se != nil && se.Type != "journey/end" { // Test that the created event includes the mapped // version of the event testslike.Test(t, lookslike.MustCompile(se.ToMap()), e.Fields) @@ -89,7 +89,7 @@ func TestJourneyEnricher(t *testing.T) { if se.Error != nil { require.Equal(t, stepError(se.Error), enrichErr) } - } else { + } else { // journey end gets a summary require.Equal(t, stepError(syntherr), enrichErr) u, _ := url.Parse(url1) diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go index 29fa9a8f3b6..68e627b1cf7 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go @@ -5,13 +5,17 @@ package synthexec import ( + "encoding/json" + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/logp" ) type ExecMultiplexer struct { - eventCounter *atomic.Int - synthEvents chan *SynthEvent - done chan struct{} + currentJourney *atomic.Bool + eventCounter *atomic.Int + synthEvents chan *SynthEvent + done chan struct{} } func (e ExecMultiplexer) Close() { @@ -22,8 +26,24 @@ func (e ExecMultiplexer) writeSynthEvent(se *SynthEvent) { if se == nil { // we skip writing nil events, since a nil means we're done return } + + if se.Type == "journey/start" { + e.currentJourney.Store(true) + e.eventCounter.Store(-1) + } + hasCurrentJourney := e.currentJourney.Load() + if se.Type == "journey/end" { + e.currentJourney.Store(false) + } + + out, err := json.Marshal(se) + se.index = e.eventCounter.Inc() - e.synthEvents <- se + if hasCurrentJourney { + e.synthEvents <- se + } else { + logp.Warn("received output from synthetics outside of journey scope: %s %s", out, err) + } } // SynthEvents returns a read only channel for synth events @@ -43,8 +63,9 @@ func (e ExecMultiplexer) Wait() { func NewExecMultiplexer() *ExecMultiplexer { return &ExecMultiplexer{ - eventCounter: atomic.NewInt(-1), // Start from -1 so first call to Inc returns 0 - synthEvents: make(chan *SynthEvent), - done: make(chan struct{}), + currentJourney: atomic.NewBool(false), + eventCounter: atomic.NewInt(-1), // Start from -1 so first call to Inc returns 0 + synthEvents: make(chan *SynthEvent), + done: make(chan struct{}), } } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go new file mode 100644 index 00000000000..6cbc34b6889 --- /dev/null +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go @@ -0,0 +1,85 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package synthexec + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestExecMultiplexer(t *testing.T) { + em := NewExecMultiplexer() + + // Generate three fake journeys with three fake steps + var testJourneys []*Journey + var testEvents []*SynthEvent + time := float64(0) + for jIdx := 0; jIdx < 3; jIdx++ { + time++ // fake time to make events seem spaced out + journey := &Journey{ + Name: fmt.Sprintf("J%d", jIdx), + Id: fmt.Sprintf("j-%d", jIdx), + } + testJourneys = append(testJourneys, journey) + testEvents = append(testEvents, &SynthEvent{ + Journey: journey, + Type: "journey/start", + TimestampEpochMicros: time, + }) + + for sIdx := 0; sIdx < 3; sIdx++ { + step := &Step{ + Name: fmt.Sprintf("S%d", sIdx), + Index: sIdx, + } + + testEvents = append(testEvents, &SynthEvent{ + Journey: journey, + Step: step, + TimestampEpochMicros: time, + }) + } + + testEvents = append(testEvents, &SynthEvent{ + Journey: journey, + Type: "journey/end", + TimestampEpochMicros: time, + }) + } + + // Write the test events in another go routine since writes block + var results []*SynthEvent + go func() { + for _, se := range testEvents { + em.writeSynthEvent(se) + } + em.Close() + }() + + // Wait for all results +Loop: + for { + select { + case result := <-em.synthEvents: + if result == nil { + break Loop + } + results = append(results, result) + } + } + + require.Len(t, results, len(testEvents)) + i := 0 // counter for index, resets on journey change + for _, se := range results { + require.Equal(t, i, se.index) + if se.Type == "journey/end" { + i = 0 + } else { + i++ + } + } +} diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go index 41a5c1ae88c..59a44bbe59a 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go @@ -18,7 +18,6 @@ import ( "sync" "time" - "github.com/elastic/beats/v7/heartbeat/beater" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -27,58 +26,11 @@ import ( const debugSelector = "synthexec" -func init() { - beater.RegisterJourneyLister(ListJourneys) -} - -// ListJourneys takes the given suite performs a dry run, capturing the Journey names, and returns the list. -func ListJourneys(ctx context.Context, suiteFile string, params common.MapStr) (journeyNames []string, err error) { - dir, err := getSuiteDir(suiteFile) - if err != nil { - return nil, err - } - - if os.Getenv("ELASTIC_SYNTHETICS_OFFLINE") != "true" { - // Ensure all deps installed - err = runSimpleCommand(exec.Command("npm", "install"), dir) - if err != nil { - return nil, err - } - - // Update playwright, needs to run separately to ensure post-install hook is run that downloads - // chrome. See https://github.com/microsoft/playwright/issues/3712 - err = runSimpleCommand(exec.Command("npm", "install", "playwright-chromium"), dir) - if err != nil { - return nil, err - } - } - - cmdFactory, err := suiteCommandFactory(dir, suiteFile, "--dry-run") - if err != nil { - return nil, err - } - - mpx, err := runCmd(ctx, cmdFactory(), nil, params) -Outer: - for { - select { - case se := <-mpx.SynthEvents(): - if se == nil { - break Outer - } - if se.Type == "journey/register" { - journeyNames = append(journeyNames, se.Journey.Name) - } - } - } - - logp.Info("Discovered journeys %#v", journeyNames) - return journeyNames, nil -} - // SuiteJob will run a single journey by name from the given suite. -func SuiteJob(ctx context.Context, suiteFile string, journeyName string, params common.MapStr) (jobs.Job, error) { - newCmd, err := suiteCommandFactory(suiteFile, suiteFile, "--screenshots", "--journey-name", journeyName) +func SuiteJob(ctx context.Context, suitePath string, params common.MapStr) (jobs.Job, error) { + // Run the command in the given suitePath, use '.' as the first arg since the command runs + // in the correct dir + newCmd, err := suiteCommandFactory(suitePath, ".", "--screenshots") if err != nil { return nil, err } @@ -86,8 +38,8 @@ func SuiteJob(ctx context.Context, suiteFile string, journeyName string, params return startCmdJob(ctx, newCmd, nil, params), nil } -func suiteCommandFactory(suiteFile string, args ...string) (func() *exec.Cmd, error) { - npmRoot, err := getNpmRoot(suiteFile) +func suiteCommandFactory(suitePath string, args ...string) (func() *exec.Cmd, error) { + npmRoot, err := getNpmRoot(suitePath) if err != nil { return nil, err } @@ -120,19 +72,20 @@ func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, if err != nil { return nil, err } - return []jobs.Job{readResultsJob(ctx, mpx.SynthEvents(), newJourneyEnricher())}, nil + senr := streamEnricher{} + return []jobs.Job{readResultsJob(ctx, mpx.SynthEvents(), senr.enrich)}, nil } } // readResultsJob adapts the output of an ExecMultiplexer into a Job, that uses continuations // to read all output. -func readResultsJob(ctx context.Context, synthEvents <-chan *SynthEvent, je *journeyEnricher) jobs.Job { +func readResultsJob(ctx context.Context, synthEvents <-chan *SynthEvent, enrich enricher) jobs.Job { return func(event *beat.Event) (conts []jobs.Job, err error) { select { case se := <-synthEvents: - err = je.enrich(event, se) + err = enrich(event, se) if se != nil { - return []jobs.Job{readResultsJob(ctx, synthEvents, je)}, err + return []jobs.Job{readResultsJob(ctx, synthEvents, enrich)}, err } else { return nil, err } @@ -301,32 +254,17 @@ func jsonToSynthEvent(bytes []byte, text string) (res *SynthEvent, err error) { return } -func getSuiteDir(suiteFile string) (string, error) { - path, err := filepath.Abs(suiteFile) - if err != nil { - return "", err - } - stat, err := os.Stat(path) - if err != nil { - return "", err - } - - if stat.IsDir() { - return suiteFile, nil - } - - return filepath.Dir(suiteFile), nil -} - -func runSimpleCommand(cmd *exec.Cmd, dir string) error { - cmd.Dir = dir - logp.Info("Running %s in %s", cmd, dir) - output, err := cmd.CombinedOutput() - logp.Info("Ran %s got %s", cmd, string(output)) - return err +// getNpmRoot gets the closest ancestor path that contains package.json. +func getNpmRoot(path string) (string, error) { + return getNpmRootIn(path, path) } -func getNpmRoot(path string) (string, error) { +// getNpmRootIn does the same as getNpmRoot but remembers the original path for +// debugging. +func getNpmRootIn(path, origPath string) (string, error) { + if path == "" { + return "", fmt.Errorf("cannot check for package.json in empty path: '%s'", origPath) + } candidate := filepath.Join(path, "package.json") _, err := os.Lstat(candidate) if err == nil { @@ -335,7 +273,7 @@ func getNpmRoot(path string) (string, error) { // Try again one level up parent := filepath.Dir(path) if len(parent) < 2 { - return "", fmt.Errorf("no package.json found") + return "", fmt.Errorf("no package.json found in '%s'", origPath) } - return getNpmRoot(parent) + return getNpmRootIn(parent, origPath) } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go index be8b4a8df8b..75a5ff5a497 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go @@ -130,10 +130,12 @@ Loop: t.Run("has echo'd stdin to stdout", func(t *testing.T) { stdoutEvents := eventsWithType("stdout") + require.Len(t, stdoutEvents, 1) require.Equal(t, stdinStr, stdoutEvents[0].Payload["message"]) }) t.Run("has echo'd two lines to stderr", func(t *testing.T) { stdoutEvents := eventsWithType("stderr") + require.Len(t, stdoutEvents, 2) require.Equal(t, "Stderr 1", stdoutEvents[0].Payload["message"]) require.Equal(t, "Stderr 2", stdoutEvents[1].Payload["message"]) }) diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go index de0c76e1401..d612529844e 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go @@ -37,6 +37,7 @@ func (se SynthEvent) ToMap() (m common.MapStr) { "type": se.Type, "package_version": se.PackageVersion, "payload": se.Payload, + "index": se.index, }, } if se.Blob != "" { diff --git a/x-pack/heartbeat/monitors/browser/synthexec/testcmd/main.go b/x-pack/heartbeat/monitors/browser/synthexec/testcmd/main.go index 57f2a48f2cc..bd70d8d58c5 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/testcmd/main.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/testcmd/main.go @@ -8,17 +8,10 @@ import ( "bufio" "fmt" "os" + "time" ) func main() { - // Sample output to test stdout - stdin := bufio.NewReader(os.Stdin) - - stdinLine, _ := stdin.ReadString('\n') - fmt.Fprintln(os.Stdout, stdinLine) - fmt.Fprintln(os.Stderr, "Stderr 1") - fmt.Fprintln(os.Stderr, "Stderr 2") - // For sending JSON results pipe := os.NewFile(3, "pipe") @@ -28,8 +21,30 @@ func main() { os.Exit(1) } scanner := bufio.NewScanner(file) + i := 0 for scanner.Scan() { - fmt.Fprintln(pipe, scanner.Text()) + // We need to test console out within a journey context + // so we wait till the first line, a journey/start is written + // we need to make sure the these raw lines are received after + // the journey start, so, even though we're careful to use + // un-buffered I/O we sleep for a generous 100ms before and after + // to make sure these lines are in the right context + // otherwise they might get lost. + // Note, in the real world lost lines here aren't a big deal + // these only are relevant in error situations, and this is a + // pathological case + if i == 1 { + time.Sleep(time.Millisecond * 100) + stdin := bufio.NewReader(os.Stdin) + stdinLine, _ := stdin.ReadString('\n') + os.Stdout.WriteString(stdinLine + "\n") + os.Stderr.WriteString("Stderr 1\n") + os.Stderr.WriteString("Stderr 2\n") + time.Sleep(time.Millisecond * 100) + } + pipe.WriteString(scanner.Text()) + pipe.WriteString("\n") + i++ } if scanner.Err() != nil { fmt.Printf("Scanner error %s", scanner.Err()) diff --git a/x-pack/heartbeat/sample-synthetics-config/heartbeat.yml b/x-pack/heartbeat/sample-synthetics-config/heartbeat.yml index a47f798246a..bebebdb4673 100644 --- a/x-pack/heartbeat/sample-synthetics-config/heartbeat.yml +++ b/x-pack/heartbeat/sample-synthetics-config/heartbeat.yml @@ -4,33 +4,37 @@ heartbeat.config.monitors: reload.enabled: false reload.period: 5s -heartbeat.synthetic_suites: -- name: Todos suite - path: "/home/andrewvc/projects/synthetics/examples/todos" - schedule: "@every 1m" - heartbeat.monitors: +- type: browser + enabled: true + id: todos-suite + name: Todos Suite + source: + local: + path: "/home/andrewvc/projects/synthetics/examples/todos/" + schedule: '@every 1m' - type: http + enabled: true id: SimpleHTTP urls: http://www.google.com schedule: "@every 15s" name: Simple HTTP - type: browser + enabled: true id: my-monitor name: My Monitor - script: |- - step("load homepage", async () => { - await page.goto('https://www.elastic.co'); - }); - step("hover over products menu", async () => { - await page.hover('css=[data-nav-item=products]'); - }); - step("failme", async () => { - await page.hhover('css=[data-nav-item=products]'); - }); - step("skip me", async () => { - // noop - }); + source: + inline: + script: + step("load homepage", async () => { + await page.goto('https://www.elastic.co'); + }); + step("hover over products menu", async () => { + await page.hover('css=[data-nav-item=products]'); + }); + step("failme", async () => { + await page.hhover('css=[data-nav-item=products]'); + }); schedule: "@every 1m" setup.template.settings: @@ -38,9 +42,7 @@ setup.template.settings: index.codec: best_compression setup.kibana: output.elasticsearch: - hosts: - - localhost:9200 - protocol: http + hosts: "127.0.0.1:9200" username: elastic password: changeme processors: