Skip to content

Commit

Permalink
Heartbeat autodiscover (elastic#8415)
Browse files Browse the repository at this point in the history
Autodiscover support for Heartbeat + tests
  • Loading branch information
andrewvc authored Oct 5, 2018
1 parent 694e010 commit cff3e40
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]

*Heartbeat*

- Added autodiscovery support {pull}8415[8415]

*Metricbeat*

*Packetbeat*
Expand Down
39 changes: 32 additions & 7 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/heartbeat/config"
"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/heartbeat/scheduler"
"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -39,6 +40,8 @@ type Heartbeat struct {
config config.Config
scheduler *scheduler.Scheduler
monitorReloader *cfgfile.Reloader
dynamicFactory *monitors.RunnerFactory
autodiscover *autodiscover.Autodiscover
}

// New creates a new heartbeat.
Expand All @@ -64,6 +67,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
done: make(chan struct{}),
config: parsedConfig,
scheduler: scheduler,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(scheduler, false),
}
return bt, nil
}
Expand All @@ -81,12 +86,22 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors)
defer bt.monitorReloader.Stop()

err := bt.RunDynamicMonitors(b)
err := bt.RunReloadableMonitors(b)
if err != nil {
return err
}
}

if bt.config.Autodiscover != nil {
bt.autodiscover, err = bt.makeAutodiscover(b)
if err != nil {
return err
}

bt.autodiscover.Start()
defer bt.autodiscover.Stop()
}

if err := bt.scheduler.Start(); err != nil {
return err
}
Expand All @@ -112,21 +127,31 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error {
return nil
}

// RunDynamicMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunDynamicMonitors(b *beat.Beat) (err error) {
factory := monitors.NewFactory(bt.scheduler, false)

// RunReloadableMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {
// Check monitor configs
if err := bt.monitorReloader.Check(factory); err != nil {
if err := bt.monitorReloader.Check(bt.dynamicFactory); err != nil {
return err
}

// Execute the monitor
go bt.monitorReloader.Run(factory)
go bt.monitorReloader.Run(bt.dynamicFactory)

return nil
}

// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
adapter := autodiscover.NewFactoryAdapter(bt.dynamicFactory)

ad, err := autodiscover.NewAutodiscover("heartbeat", b.Publisher, adapter, bt.config.Autodiscover)
if err != nil {
return nil, err
}

return ad, nil
}

// Stop stops the beat.
func (bt *Heartbeat) Stop() {
close(bt.done)
Expand Down
8 changes: 5 additions & 3 deletions heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
package config

import (
"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/common"
)

// Config defines the structure of heartbeat.yml.
type Config struct {
// Modules is a list of module specific configuration data.
Monitors []*common.Config `config:"monitors"`
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Monitors []*common.Config `config:"monitors"`
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
}

// Scheduler defines the syntax of a heartbeat.yml scheduler block.
Expand Down
15 changes: 15 additions & 0 deletions heartbeat/tests/system/config/heartbeat.yml.j2
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
logging.level: debug

heartbeat.monitors:
{% for monitor in monitors -%}
- type: {{ monitor.type }}
Expand Down Expand Up @@ -40,6 +42,19 @@ heartbeat.config.monitors:
reload.enabled: {{ reload|default("false")}}
{% endif -%}

{% if autodiscover %}
heartbeat.autodiscover:
providers:
{%- for provider, settings in autodiscover.items() %}
- type: {{provider}}
{%- if settings %}
{%- for k, v in settings.items() %}
{{k}}: {{v | default([])}}
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}

{%- if shipper_name %}
name: {{ shipper_name }}
{% endif %}
Expand Down
64 changes: 64 additions & 0 deletions heartbeat/tests/system/test_autodiscovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
from heartbeat import BaseTest
import unittest
import re

from beat.beat import INTEGRATION_TESTS


class TestAutodiscover(BaseTest):
"""
Test heartbeat autodiscover
"""
@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_docker(self):
"""
Test docker autodiscover starts modules from templates
"""
import docker
docker_client = docker.from_env()

self.render_config_template(
autodiscover={
'docker': {
'templates': '''
- condition:
contains.docker.container.image: redis
config:
- type: tcp
hosts: ["${data.host}:${data.port}"]
schedule: "@every 1s"
timeout: 1s
''',
},
},
)

proc = self.start_beat()

self.wait_until(lambda: self.log_contains(
re.compile('autodiscover.+Got a start event:', re.I)))

self.wait_until(lambda: self.output_count(lambda x: x >= 1))

output = self.read_output_json()
proc.check_kill_and_wait()

matched = False
matcher = re.compile("redis", re.I)
for i, container in enumerate(docker_client.containers.list()):
for tag in container.image.tags:
if matcher.search(tag):
network_settings = container.attrs['NetworkSettings']
host = network_settings['Networks'].values()[
0]['IPAddress']
port = network_settings['Ports'].keys()[0].split("/")[0]
# Check metadata is added
expected = 'tcp-tcp@%s:%s' % (host, port)
actual = output[0]['monitor']['id']
if expected == actual:
matched = True

assert matched
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (
"github.com/elastic/beats/libbeat/common/bus"
)

// AutodiscoverAdapter for Metricbeat modules
type AutodiscoverAdapter struct {
// FactoryAdapter is an adapter that works with any cfgfile.RunnerFactory.
type FactoryAdapter struct {
factory cfgfile.RunnerFactory
}

// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Metricbeat modules
func NewAutodiscoverAdapter(factory cfgfile.RunnerFactory) *AutodiscoverAdapter {
return &AutodiscoverAdapter{
// NewFactoryAdapter builds and returns an autodiscover adapter that works with any cfgfile.RunnerFactory.
func NewFactoryAdapter(factory cfgfile.RunnerFactory) *FactoryAdapter {
return &FactoryAdapter{
factory: factory,
}
}

// CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter`
func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) {
func (m *FactoryAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) {
config, ok := e["config"].([]*common.Config)
if !ok {
return nil, errors.New("Got a wrong value in event `config` key")
Expand All @@ -48,16 +48,16 @@ func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error
}

// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work
func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error {
func (m *FactoryAdapter) CheckConfig(c *common.Config) error {
return m.factory.CheckConfig(c)
}

// Create a module or prospector from the given config
func (m *AutodiscoverAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (m *FactoryAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
return m.factory.Create(p, c, meta)
}

// EventFilter returns the bus filter to retrieve runner start/stop triggering events
func (m *AutodiscoverAdapter) EventFilter() []string {
func (m *FactoryAdapter) EventFilter() []string {
return []string{"config"}
}
8 changes: 8 additions & 0 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time
import yaml
import hashlib
import re
from datetime import datetime, timedelta

from .compose import ComposeMixin
Expand All @@ -22,6 +23,8 @@

yaml_cache = {}

REGEXP_TYPE = type(re.compile("t"))


class TimeoutError(Exception):
pass
Expand Down Expand Up @@ -359,6 +362,7 @@ def log_contains_count(self, msg, logfile=None, ignore_case=False):
"""
Returns the number of appearances of the given string in the log file
"""
is_regexp = type(msg) == REGEXP_TYPE

counter = 0
if ignore_case:
Expand All @@ -371,6 +375,10 @@ def log_contains_count(self, msg, logfile=None, ignore_case=False):
try:
with open(os.path.join(self.working_dir, logfile), "r") as f:
for line in f:
if is_regexp:
if msg.search(line) is not None:
counter = counter + 1
continue
if ignore_case:
line = line.lower()
if line.find(msg) >= 0:
Expand Down
6 changes: 4 additions & 2 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
mbautodiscover "github.com/elastic/beats/metricbeat/autodiscover"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/module"

// Add autodiscover builders / appenders
_ "github.com/elastic/beats/metricbeat/autodiscover"

// Add metricbeat default processors
_ "github.com/elastic/beats/metricbeat/processor/add_kubernetes_metadata"
)
Expand Down Expand Up @@ -172,7 +174,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
if config.Autodiscover != nil {
var err error
factory := module.NewFactory(metricbeat.moduleOptions...)
adapter := mbautodiscover.NewAutodiscoverAdapter(factory)
adapter := autodiscover.NewFactoryAdapter(factory)
metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover)
if err != nil {
return nil, err
Expand Down

0 comments on commit cff3e40

Please sign in to comment.