Skip to content

Commit

Permalink
[Heartbeat] One shot mode (#25972) (#28421)
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 14, 2021
1 parent 271042c commit e54d1f7
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Bundle synthetics deps with heartbeat docker image. {pull}23274[23274]
- Add mime type detection for http responses. {pull}22976[22976]
- Support JSON expressions / validation of JSON arrays. {pull}28073[28073]
- Experimental 'run once' mode. {pull}25972[25972]

*Journalbeat*

Expand Down
12 changes: 12 additions & 0 deletions heartbeat/_meta/config/beat.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ heartbeat.monitors:
# Name of corresponding APM service, if Elastic APM is in use for the monitored service.
#service.name: my-apm-service-name

# Experimental: Configure monitors that run exactly once.
# If enabled, heartbeat.monitors will be ignored
# Heartbeat will run these monitors once then exit.
#heartbeat.run_once:
#- type: http
#id: my-monitor
#name: My Monitor
#urls: ["http://localhost:9200"]
# NOTE: you must still provide the schedule field! Heartbeat
# Uses this to determine the contents of the monitor.timespan field
#schedule: '@every 10s'

{{header "Elasticsearch template setting"}}

setup.template.settings:
Expand Down
72 changes: 71 additions & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ package beater
import (
"errors"
"fmt"
"sync"
"syscall"
"time"

"github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/heartbeat/hbregistry"
"github.com/elastic/beats/v7/heartbeat/monitors"
"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -34,6 +37,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/x-pack/functionbeat/function/core"

_ "github.com/elastic/beats/v7/libbeat/processors/script"
)
Expand Down Expand Up @@ -81,10 +85,17 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// Run executes the beat.
func (bt *Heartbeat) Run(b *beat.Beat) error {
logp.Info("heartbeat is running! Hit CTRL-C to stop it.")

groups, _ := syscall.Getgroups()
logp.Info("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups)

if bt.config.RunOnce != nil {
err := bt.runRunOnce(b)
if err != nil {
return err
}
return nil
}

stopStaticMonitors, err := bt.RunStaticMonitors(b)
if err != nil {
return err
Expand Down Expand Up @@ -126,6 +137,65 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
return nil
}

// runRunOnce runs the given config then exits immediately after any queued events have been sent to ES
func (bt *Heartbeat) runRunOnce(b *beat.Beat) error {
logp.Info("Starting run_once run. This is an experimental feature and may be changed or removed in the future!")
cfgs := bt.config.RunOnce

publishClient, err := core.NewSyncClient(logp.NewLogger("run_once mode"), b.Publisher, beat.ClientConfig{})
if err != nil {
return fmt.Errorf("could not create sync client: %w", err)
}
defer publishClient.Close()

wg := &sync.WaitGroup{}
for _, cfg := range cfgs {
err := runRunOnceSingleConfig(cfg, publishClient, wg)
if err != nil {
logp.Warn("error running run_once config: %s", err)
}
}

wg.Wait()
publishClient.Wait()

logp.Info("Ending run_once run")

return nil
}

func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient, wg *sync.WaitGroup) (err error) {
sf, err := stdfields.ConfigToStdMonitorFields(cfg)
if err != nil {
return fmt.Errorf("could not get stdmon fields: %w", err)
}
pluginFactory, exists := plugin.GlobalPluginsReg.Get(sf.Type)
if !exists {
return fmt.Errorf("no plugin for type: %s", sf.Type)
}
plugin, err := pluginFactory.Make(sf.Type, cfg)
if err != nil {
return err
}

results := plugin.RunWrapped(sf)

wg.Add(1)
go func() {
defer wg.Done()
defer plugin.Close()
for {
event := <-results
if event == nil {
break
}
publishClient.Publish(*event)
}
}()

return nil
}

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
factory := monitors.NewFactory(b.Info, bt.scheduler, true)
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// Config defines the structure of heartbeat.yml.
type Config struct {
// Modules is a list of module specific configuration data.
RunOnce []*common.Config `config:"run_once"`
Monitors []*common.Config `config:"monitors"`
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Expand Down
34 changes: 34 additions & 0 deletions heartbeat/docs/heartbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,37 @@ include::monitors/monitor-tcp.asciidoc[]
include::monitors/monitor-http.asciidoc[]

include::monitors/monitor-browser.asciidoc[]

[float]
[[run-once-mode]]
=== Run Once Mode (Experimental)

You can configure {beatname_uc} run monitors exactly once then exit, bypassing the scheduler. This is referred to as running {beatname_uc} in "run once" mode. This is an experimental feature
and is subject to change.

[source,yaml]
----------------------------------------------------------------------
# heartbeat.yml
heartbeat.run_once:
- type: icmp
id: ping-myhost
name: My Host Ping
hosts: ["myhost"]
# Note that schedule is still needed to inform heartbeat when the next
# expected check is to be run. This is needed to populate the monitor.timespan field used by the Uptime app.
schedule: '@every 5s'
- type: tcp
id: myhost-tcp-echo
name: My Host TCP Echo
hosts: ["myhost:777"] # default TCP Echo Protocol
check.send: "Check"
check.receive: "Check"
schedule: '@every 5s'
- type: http
id: service-status
name: Service Status
service.name: my-apm-service-name
hosts: ["http://localhost:80/service/status"]
check.response.status: [200]
schedule: '@every 5s'
----------------------------------------------------------------------
12 changes: 12 additions & 0 deletions heartbeat/heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ heartbeat.monitors:
# Name of corresponding APM service, if Elastic APM is in use for the monitored service.
#service.name: my-apm-service-name

# Experimental: Configure monitors that run exactly once.
# If enabled, heartbeat.monitors will be ignored
# Heartbeat will run these monitors once then exit.
#heartbeat.run_once:
#- type: http
#id: my-monitor
#name: My Monitor
#urls: ["http://localhost:9200"]
# NOTE: you must still provide the schedule field! Heartbeat
# Uses this to determine the contents of the monitor.timespan field
#schedule: '@every 10s'

# ======================= Elasticsearch template setting =======================

setup.template.settings:
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func create(
js[i] = wrappers.WithURLField(u, job)
}

return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(config.Hosts)}, nil
return plugin.Plugin{Jobs: js, Endpoints: len(config.Hosts)}, nil
}

func newRoundTripper(config *Config) (http.RoundTripper, error) {
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) {
j = append(j, wrappers.WithURLField(u, job))
}

return plugin.Plugin{Jobs: j, Close: nil, Endpoints: len(jf.config.Hosts)}, nil
return plugin.Plugin{Jobs: j, Endpoints: len(jf.config.Hosts)}, nil
}

func (jf *jobFactory) pingIPFactory(config *Config) func(*net.IPAddr) jobs.Job {
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func createWithResolver(
return plugin.Plugin{}, err
}

return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(jc.endpoints)}, nil
return plugin.Plugin{Jobs: js, Endpoints: len(jc.endpoints)}, nil
}

// jobFactory is where most of the logic here lives. It provides a common context around
Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {
return plugin.PluginFactory{
Name: "test",
Aliases: []string{"testAlias"},
Builder: func(s string, config *common.Config) (plugin.Plugin, error) {
Make: func(s string, config *common.Config) (plugin.Plugin, error) {
built.Inc()
// Declare a real config block with a required attr so we can see what happens when it doesn't work
unpacked := struct {
Expand All @@ -160,7 +160,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {
closed.Inc()
return nil
}
return plugin.Plugin{Jobs: j, Close: closer, Endpoints: 1}, err
return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, err
},
Stats: plugin.NewPluginCountersRecorder("test", reg)},
built,
Expand Down
56 changes: 49 additions & 7 deletions heartbeat/monitors/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,67 @@ import (

"github.com/elastic/beats/v7/heartbeat/hbregistry"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/plugin"
)

// PluginFactory represents an uninstantiated plug in instance generated from a monitor config. Invoking the Make function creates a plug-in instance.
type PluginFactory struct {
Name string
Aliases []string
Builder PluginFactoryCreate
Make PluginMake
Stats RegistryRecorder
}

type PluginFactoryCreate func(string, *common.Config) (p Plugin, err error)
type PluginMake func(string, *common.Config) (p Plugin, err error)

// Plugin describes a configured instance of a plug-in with its jobs already instantiated.
type Plugin struct {
Jobs []jobs.Job
Close func() error
DoClose func() error
Endpoints int
}

// Close closes the plugin, invoking any DoClose hooks if avialable.
func (p Plugin) Close() error {
if p.DoClose != nil {
return p.DoClose()
}
return nil
}

// RunWrapped runs the plug-in with the provided wrappers returning a channel of resultant events.
func (p Plugin) RunWrapped(fields stdfields.StdMonitorFields) chan *beat.Event {
wj := wrappers.WrapCommon(p.Jobs, fields)
results := make(chan *beat.Event)

var runJob func(j jobs.Job)
runJob = func(j jobs.Job) {
e := &beat.Event{}
conts, err := j(e)
// No error handling since WrapCommon handles all errors
if err != nil {
panic(fmt.Sprintf("unexpected error on wrapped job!: %s", err))
}
results <- e
for _, c := range conts {
runJob(c)
}
}

go func() {
for _, j := range wj {
runJob(j)
}
close(results)
}()

return results
}

var pluginKey = "heartbeat.monitor"

// stateGlobalRecorder records statistics across all plugin types
Expand All @@ -70,7 +112,7 @@ func init() {
}

stats := statsForPlugin(p.Name)
return GlobalPluginsReg.Register(PluginFactory{p.Name, p.Aliases, p.Builder, stats})
return GlobalPluginsReg.Register(PluginFactory{p.Name, p.Aliases, p.Make, stats})
})
}

Expand Down Expand Up @@ -98,9 +140,9 @@ func NewPluginsReg() *PluginsReg {
}

// Register registers a new active (as opposed to passive) monitor.
func Register(name string, builder PluginFactoryCreate, aliases ...string) {
func Register(name string, make PluginMake, aliases ...string) {
stats := statsForPlugin(name)
if err := GlobalPluginsReg.Add(PluginFactory{name, aliases, builder, stats}); err != nil {
if err := GlobalPluginsReg.Add(PluginFactory{name, aliases, make, stats}); err != nil {
panic(err)
}
}
Expand Down Expand Up @@ -161,5 +203,5 @@ func (r *PluginsReg) MonitorNames() []string {
}

func (e *PluginFactory) Create(cfg *common.Config) (p Plugin, err error) {
return e.Builder(e.Name, cfg)
return e.Make(e.Name, cfg)
}
Loading

0 comments on commit e54d1f7

Please sign in to comment.