From cff3e40cfedb9521d7249caaa569668de203dc59 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 5 Oct 2018 14:55:31 -0400 Subject: [PATCH] Heartbeat autodiscover (#8415) Autodiscover support for Heartbeat + tests --- CHANGELOG.asciidoc | 2 + heartbeat/beater/heartbeat.go | 39 +++++++++-- heartbeat/config/config.go | 8 ++- .../tests/system/config/heartbeat.yml.j2 | 15 +++++ heartbeat/tests/system/test_autodiscovery.py | 64 +++++++++++++++++++ .../autodiscover/factoryadapter.go | 18 +++--- libbeat/tests/system/beat/beat.py | 8 +++ metricbeat/beater/metricbeat.go | 6 +- 8 files changed, 139 insertions(+), 21 deletions(-) create mode 100644 heartbeat/tests/system/test_autodiscovery.py rename metricbeat/autodiscover/autodiscover.go => libbeat/autodiscover/factoryadapter.go (73%) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 99447da409f..6cee98d441a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -21,6 +21,8 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] *Heartbeat* +- Added autodiscovery support {pull}8415[8415] + *Metricbeat* *Packetbeat* diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index f12c3cd48e1..d494b555f62 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/heartbeat/config" "github.com/elastic/beats/heartbeat/monitors" "github.com/elastic/beats/heartbeat/scheduler" + "github.com/elastic/beats/libbeat/autodiscover" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" @@ -39,6 +40,8 @@ type Heartbeat struct { config config.Config scheduler *scheduler.Scheduler monitorReloader *cfgfile.Reloader + dynamicFactory *monitors.RunnerFactory + autodiscover *autodiscover.Autodiscover } // New creates a new heartbeat. @@ -64,6 +67,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { done: make(chan struct{}), config: parsedConfig, scheduler: scheduler, + // dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload + dynamicFactory: monitors.NewFactory(scheduler, false), } return bt, nil } @@ -81,12 +86,22 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors) defer bt.monitorReloader.Stop() - err := bt.RunDynamicMonitors(b) + err := bt.RunReloadableMonitors(b) if err != nil { return err } } + if bt.config.Autodiscover != nil { + bt.autodiscover, err = bt.makeAutodiscover(b) + if err != nil { + return err + } + + bt.autodiscover.Start() + defer bt.autodiscover.Stop() + } + if err := bt.scheduler.Start(); err != nil { return err } @@ -112,21 +127,31 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error { return nil } -// RunDynamicMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present. -func (bt *Heartbeat) RunDynamicMonitors(b *beat.Beat) (err error) { - factory := monitors.NewFactory(bt.scheduler, false) - +// RunReloadableMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present. +func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) { // Check monitor configs - if err := bt.monitorReloader.Check(factory); err != nil { + if err := bt.monitorReloader.Check(bt.dynamicFactory); err != nil { return err } // Execute the monitor - go bt.monitorReloader.Run(factory) + go bt.monitorReloader.Run(bt.dynamicFactory) return nil } +// makeAutodiscover creates an autodiscover object ready to be started. +func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) { + adapter := autodiscover.NewFactoryAdapter(bt.dynamicFactory) + + ad, err := autodiscover.NewAutodiscover("heartbeat", b.Publisher, adapter, bt.config.Autodiscover) + if err != nil { + return nil, err + } + + return ad, nil +} + // Stop stops the beat. func (bt *Heartbeat) Stop() { close(bt.done) diff --git a/heartbeat/config/config.go b/heartbeat/config/config.go index 620f1fd62c3..e9b3647f833 100644 --- a/heartbeat/config/config.go +++ b/heartbeat/config/config.go @@ -21,15 +21,17 @@ package config import ( + "github.com/elastic/beats/libbeat/autodiscover" "github.com/elastic/beats/libbeat/common" ) // Config defines the structure of heartbeat.yml. type Config struct { // Modules is a list of module specific configuration data. - Monitors []*common.Config `config:"monitors"` - ConfigMonitors *common.Config `config:"config.monitors"` - Scheduler Scheduler `config:"scheduler"` + Monitors []*common.Config `config:"monitors"` + ConfigMonitors *common.Config `config:"config.monitors"` + Scheduler Scheduler `config:"scheduler"` + Autodiscover *autodiscover.Config `config:"autodiscover"` } // Scheduler defines the syntax of a heartbeat.yml scheduler block. diff --git a/heartbeat/tests/system/config/heartbeat.yml.j2 b/heartbeat/tests/system/config/heartbeat.yml.j2 index 4407d65e0c7..2405f0223dd 100644 --- a/heartbeat/tests/system/config/heartbeat.yml.j2 +++ b/heartbeat/tests/system/config/heartbeat.yml.j2 @@ -1,3 +1,5 @@ +logging.level: debug + heartbeat.monitors: {% for monitor in monitors -%} - type: {{ monitor.type }} @@ -40,6 +42,19 @@ heartbeat.config.monitors: reload.enabled: {{ reload|default("false")}} {% endif -%} +{% if autodiscover %} +heartbeat.autodiscover: + providers: + {%- for provider, settings in autodiscover.items() %} + - type: {{provider}} + {%- if settings %} + {%- for k, v in settings.items() %} + {{k}}: {{v | default([])}} + {%- endfor %} + {%- endif %} + {%- endfor %} +{% endif %} + {%- if shipper_name %} name: {{ shipper_name }} {% endif %} diff --git a/heartbeat/tests/system/test_autodiscovery.py b/heartbeat/tests/system/test_autodiscovery.py new file mode 100644 index 00000000000..555a64ddd66 --- /dev/null +++ b/heartbeat/tests/system/test_autodiscovery.py @@ -0,0 +1,64 @@ +import os +from heartbeat import BaseTest +import unittest +import re + +from beat.beat import INTEGRATION_TESTS + + +class TestAutodiscover(BaseTest): + """ + Test heartbeat autodiscover + """ + @unittest.skipIf(not INTEGRATION_TESTS or + os.getenv("TESTING_ENVIRONMENT") == "2x", + "integration test not available on 2.x") + def test_docker(self): + """ + Test docker autodiscover starts modules from templates + """ + import docker + docker_client = docker.from_env() + + self.render_config_template( + autodiscover={ + 'docker': { + 'templates': ''' + - condition: + contains.docker.container.image: redis + config: + - type: tcp + hosts: ["${data.host}:${data.port}"] + schedule: "@every 1s" + timeout: 1s + ''', + }, + }, + ) + + proc = self.start_beat() + + self.wait_until(lambda: self.log_contains( + re.compile('autodiscover.+Got a start event:', re.I))) + + self.wait_until(lambda: self.output_count(lambda x: x >= 1)) + + output = self.read_output_json() + proc.check_kill_and_wait() + + matched = False + matcher = re.compile("redis", re.I) + for i, container in enumerate(docker_client.containers.list()): + for tag in container.image.tags: + if matcher.search(tag): + network_settings = container.attrs['NetworkSettings'] + host = network_settings['Networks'].values()[ + 0]['IPAddress'] + port = network_settings['Ports'].keys()[0].split("/")[0] + # Check metadata is added + expected = 'tcp-tcp@%s:%s' % (host, port) + actual = output[0]['monitor']['id'] + if expected == actual: + matched = True + + assert matched diff --git a/metricbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/factoryadapter.go similarity index 73% rename from metricbeat/autodiscover/autodiscover.go rename to libbeat/autodiscover/factoryadapter.go index 669d8e44c39..f2a3000b8d6 100644 --- a/metricbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/factoryadapter.go @@ -26,20 +26,20 @@ import ( "github.com/elastic/beats/libbeat/common/bus" ) -// AutodiscoverAdapter for Metricbeat modules -type AutodiscoverAdapter struct { +// FactoryAdapter is an adapter that works with any cfgfile.RunnerFactory. +type FactoryAdapter struct { factory cfgfile.RunnerFactory } -// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Metricbeat modules -func NewAutodiscoverAdapter(factory cfgfile.RunnerFactory) *AutodiscoverAdapter { - return &AutodiscoverAdapter{ +// NewFactoryAdapter builds and returns an autodiscover adapter that works with any cfgfile.RunnerFactory. +func NewFactoryAdapter(factory cfgfile.RunnerFactory) *FactoryAdapter { + return &FactoryAdapter{ factory: factory, } } // CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter` -func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) { +func (m *FactoryAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) { config, ok := e["config"].([]*common.Config) if !ok { return nil, errors.New("Got a wrong value in event `config` key") @@ -48,16 +48,16 @@ func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error } // CheckConfig tests given config to check if it will work or not, returns errors in case it won't work -func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error { +func (m *FactoryAdapter) CheckConfig(c *common.Config) error { return m.factory.CheckConfig(c) } // Create a module or prospector from the given config -func (m *AutodiscoverAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (m *FactoryAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { return m.factory.Create(p, c, meta) } // EventFilter returns the bus filter to retrieve runner start/stop triggering events -func (m *AutodiscoverAdapter) EventFilter() []string { +func (m *FactoryAdapter) EventFilter() []string { return []string{"config"} } diff --git a/libbeat/tests/system/beat/beat.py b/libbeat/tests/system/beat/beat.py index 54b09268612..6e5659fffd5 100644 --- a/libbeat/tests/system/beat/beat.py +++ b/libbeat/tests/system/beat/beat.py @@ -10,6 +10,7 @@ import time import yaml import hashlib +import re from datetime import datetime, timedelta from .compose import ComposeMixin @@ -22,6 +23,8 @@ yaml_cache = {} +REGEXP_TYPE = type(re.compile("t")) + class TimeoutError(Exception): pass @@ -359,6 +362,7 @@ def log_contains_count(self, msg, logfile=None, ignore_case=False): """ Returns the number of appearances of the given string in the log file """ + is_regexp = type(msg) == REGEXP_TYPE counter = 0 if ignore_case: @@ -371,6 +375,10 @@ def log_contains_count(self, msg, logfile=None, ignore_case=False): try: with open(os.path.join(self.working_dir, logfile), "r") as f: for line in f: + if is_regexp: + if msg.search(line) is not None: + counter = counter + 1 + continue if ignore_case: line = line.lower() if line.find(msg) >= 0: diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 7223fbf757b..bdd13f95978 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -32,10 +32,12 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" - mbautodiscover "github.com/elastic/beats/metricbeat/autodiscover" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" + // Add autodiscover builders / appenders + _ "github.com/elastic/beats/metricbeat/autodiscover" + // Add metricbeat default processors _ "github.com/elastic/beats/metricbeat/processor/add_kubernetes_metadata" ) @@ -172,7 +174,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe if config.Autodiscover != nil { var err error factory := module.NewFactory(metricbeat.moduleOptions...) - adapter := mbautodiscover.NewAutodiscoverAdapter(factory) + adapter := autodiscover.NewFactoryAdapter(factory) metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover) if err != nil { return nil, err