Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #8415 to 6.x: Heartbeat autodiscover #8593

Merged
merged 1 commit into from
Oct 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]

*Heartbeat*

- Added autodiscovery support {pull}8415[8415]

*Metricbeat*

- Add `replstatus` metricset to MongoDB module {pull}7604[7604]
Expand Down
39 changes: 32 additions & 7 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/heartbeat/config"
"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/heartbeat/scheduler"
"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -39,6 +40,8 @@ type Heartbeat struct {
config config.Config
scheduler *scheduler.Scheduler
monitorReloader *cfgfile.Reloader
dynamicFactory *monitors.RunnerFactory
autodiscover *autodiscover.Autodiscover
}

// New creates a new heartbeat.
Expand All @@ -64,6 +67,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
done: make(chan struct{}),
config: parsedConfig,
scheduler: scheduler,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(scheduler, false),
}
return bt, nil
}
Expand All @@ -81,12 +86,22 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors)
defer bt.monitorReloader.Stop()

err := bt.RunDynamicMonitors(b)
err := bt.RunReloadableMonitors(b)
if err != nil {
return err
}
}

if bt.config.Autodiscover != nil {
bt.autodiscover, err = bt.makeAutodiscover(b)
if err != nil {
return err
}

bt.autodiscover.Start()
defer bt.autodiscover.Stop()
}

if err := bt.scheduler.Start(); err != nil {
return err
}
Expand All @@ -112,21 +127,31 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error {
return nil
}

// RunDynamicMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunDynamicMonitors(b *beat.Beat) (err error) {
factory := monitors.NewFactory(bt.scheduler, false)

// RunReloadableMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {
// Check monitor configs
if err := bt.monitorReloader.Check(factory); err != nil {
if err := bt.monitorReloader.Check(bt.dynamicFactory); err != nil {
return err
}

// Execute the monitor
go bt.monitorReloader.Run(factory)
go bt.monitorReloader.Run(bt.dynamicFactory)

return nil
}

// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
adapter := autodiscover.NewFactoryAdapter(bt.dynamicFactory)

ad, err := autodiscover.NewAutodiscover("heartbeat", b.Publisher, adapter, bt.config.Autodiscover)
if err != nil {
return nil, err
}

return ad, nil
}

// Stop stops the beat.
func (bt *Heartbeat) Stop() {
close(bt.done)
Expand Down
8 changes: 5 additions & 3 deletions heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
package config

import (
"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/common"
)

// Config defines the structure of heartbeat.yml.
type Config struct {
// Modules is a list of module specific configuration data.
Monitors []*common.Config `config:"monitors"`
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Monitors []*common.Config `config:"monitors"`
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
}

// Scheduler defines the syntax of a heartbeat.yml scheduler block.
Expand Down
15 changes: 15 additions & 0 deletions heartbeat/tests/system/config/heartbeat.yml.j2
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
logging.level: debug

heartbeat.monitors:
{% for monitor in monitors -%}
- type: {{ monitor.type }}
Expand Down Expand Up @@ -40,6 +42,19 @@ heartbeat.config.monitors:
reload.enabled: {{ reload|default("false")}}
{% endif -%}

{% if autodiscover %}
heartbeat.autodiscover:
providers:
{%- for provider, settings in autodiscover.items() %}
- type: {{provider}}
{%- if settings %}
{%- for k, v in settings.items() %}
{{k}}: {{v | default([])}}
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}

{%- if shipper_name %}
name: {{ shipper_name }}
{% endif %}
Expand Down
64 changes: 64 additions & 0 deletions heartbeat/tests/system/test_autodiscovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
from heartbeat import BaseTest
import unittest
import re

from beat.beat import INTEGRATION_TESTS


class TestAutodiscover(BaseTest):
"""
Test heartbeat autodiscover
"""
@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_docker(self):
"""
Test docker autodiscover starts modules from templates
"""
import docker
docker_client = docker.from_env()

self.render_config_template(
autodiscover={
'docker': {
'templates': '''
- condition:
contains.docker.container.image: redis
config:
- type: tcp
hosts: ["${data.host}:${data.port}"]
schedule: "@every 1s"
timeout: 1s
''',
},
},
)

proc = self.start_beat()

self.wait_until(lambda: self.log_contains(
re.compile('autodiscover.+Got a start event:', re.I)))

self.wait_until(lambda: self.output_count(lambda x: x >= 1))

output = self.read_output_json()
proc.check_kill_and_wait()

matched = False
matcher = re.compile("redis", re.I)
for i, container in enumerate(docker_client.containers.list()):
for tag in container.image.tags:
if matcher.search(tag):
network_settings = container.attrs['NetworkSettings']
host = network_settings['Networks'].values()[
0]['IPAddress']
port = network_settings['Ports'].keys()[0].split("/")[0]
# Check metadata is added
expected = 'tcp-tcp@%s:%s' % (host, port)
actual = output[0]['monitor']['id']
if expected == actual:
matched = True

assert matched
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (
"github.com/elastic/beats/libbeat/common/bus"
)

// AutodiscoverAdapter for Metricbeat modules
type AutodiscoverAdapter struct {
// FactoryAdapter is an adapter that works with any cfgfile.RunnerFactory.
type FactoryAdapter struct {
factory cfgfile.RunnerFactory
}

// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Metricbeat modules
func NewAutodiscoverAdapter(factory cfgfile.RunnerFactory) *AutodiscoverAdapter {
return &AutodiscoverAdapter{
// NewFactoryAdapter builds and returns an autodiscover adapter that works with any cfgfile.RunnerFactory.
func NewFactoryAdapter(factory cfgfile.RunnerFactory) *FactoryAdapter {
return &FactoryAdapter{
factory: factory,
}
}

// CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter`
func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) {
func (m *FactoryAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) {
config, ok := e["config"].([]*common.Config)
if !ok {
return nil, errors.New("Got a wrong value in event `config` key")
Expand All @@ -48,16 +48,16 @@ func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error
}

// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work
func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error {
func (m *FactoryAdapter) CheckConfig(c *common.Config) error {
return m.factory.CheckConfig(c)
}

// Create a module or prospector from the given config
func (m *AutodiscoverAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (m *FactoryAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
return m.factory.Create(p, c, meta)
}

// EventFilter returns the bus filter to retrieve runner start/stop triggering events
func (m *AutodiscoverAdapter) EventFilter() []string {
func (m *FactoryAdapter) EventFilter() []string {
return []string{"config"}
}
8 changes: 8 additions & 0 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time
import yaml
import hashlib
import re
from datetime import datetime, timedelta

from .compose import ComposeMixin
Expand All @@ -22,6 +23,8 @@

yaml_cache = {}

REGEXP_TYPE = type(re.compile("t"))


class TimeoutError(Exception):
pass
Expand Down Expand Up @@ -359,6 +362,7 @@ def log_contains_count(self, msg, logfile=None, ignore_case=False):
"""
Returns the number of appearances of the given string in the log file
"""
is_regexp = type(msg) == REGEXP_TYPE

counter = 0
if ignore_case:
Expand All @@ -371,6 +375,10 @@ def log_contains_count(self, msg, logfile=None, ignore_case=False):
try:
with open(os.path.join(self.working_dir, logfile), "r") as f:
for line in f:
if is_regexp:
if msg.search(line) is not None:
counter = counter + 1
continue
if ignore_case:
line = line.lower()
if line.find(msg) >= 0:
Expand Down
6 changes: 4 additions & 2 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
mbautodiscover "github.com/elastic/beats/metricbeat/autodiscover"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/module"

// Add autodiscover builders / appenders
_ "github.com/elastic/beats/metricbeat/autodiscover"

// Add metricbeat default processors
_ "github.com/elastic/beats/metricbeat/processor/add_kubernetes_metadata"
)
Expand Down Expand Up @@ -169,7 +171,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
if config.Autodiscover != nil {
var err error
factory := module.NewFactory(metricbeat.moduleOptions...)
adapter := mbautodiscover.NewAutodiscoverAdapter(factory)
adapter := autodiscover.NewFactoryAdapter(factory)
metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover)
if err != nil {
return nil, err
Expand Down