Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add leader election for autodiscover #20281

Merged
merged 22 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added the `max_cached_sessions` option to the script processor. {pull}19562[19562]
- Add support for DNS over TLS for the dns_processor. {pull}19321[19321]
- Set index.max_docvalue_fields_search in index template to increase value to 200 fields. {issue}20215[20215]
- Add leader election for Kubernetes autodiscover. {pull}20281[20281]
- Add capability of enriching process metadata with contianer id also for non-privileged containers in `add_process_metadata` processor. {pull}19767[19767]

*Auditbeat*
Expand Down
6 changes: 6 additions & 0 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@ rules:
- "/metrics"
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
---
apiVersion: v1
kind: ServiceAccount
Expand Down
6 changes: 6 additions & 0 deletions deploy/kubernetes/metricbeat/metricbeat-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ rules:
- "/metrics"
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ require (
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200701041122-1837592efa10
golang.org/x/tools v0.0.0-20200806022845-90696ccdc692
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
google.golang.org/grpc v1.29.1
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewAutodiscover(
// Init providers
var providers []Provider
for _, providerCfg := range config.Providers {
provider, err := Registry.BuildProvider(bus, providerCfg, keystore)
provider, err := Registry.BuildProvider(name, bus, providerCfg, keystore)
if err != nil {
return nil, errors.Wrap(err, "error in autodiscover provider settings")
}
Expand Down
6 changes: 3 additions & 3 deletions libbeat/autodiscover/autodiscover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestAutodiscover(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -259,7 +259,7 @@ func TestAutodiscoverHash(t *testing.T) {
busChan := make(chan bus.Bus, 1)

Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -323,7 +323,7 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down
6 changes: 3 additions & 3 deletions libbeat/autodiscover/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Provider interface {
}

// ProviderBuilder creates a new provider based on the given config and returns it
type ProviderBuilder func(bus.Bus, uuid.UUID, *common.Config, keystore.Keystore) (Provider, error)
type ProviderBuilder func(string, bus.Bus, uuid.UUID, *common.Config, keystore.Keystore) (Provider, error)

// AddProvider registers a new ProviderBuilder
func (r *registry) AddProvider(name string, provider ProviderBuilder) error {
Expand Down Expand Up @@ -70,7 +70,7 @@ func (r *registry) GetProvider(name string) ProviderBuilder {
}

// BuildProvider reads provider configuration and instantiate one
func (r *registry) BuildProvider(bus bus.Bus, c *common.Config, keystore keystore.Keystore) (Provider, error) {
func (r *registry) BuildProvider(beatName string, bus bus.Bus, c *common.Config, keystore keystore.Keystore) (Provider, error) {
var config ProviderConfig
err := c.Unpack(&config)
if err != nil {
Expand All @@ -87,5 +87,5 @@ func (r *registry) BuildProvider(bus bus.Bus, c *common.Config, keystore keystor
return nil, err
}

return builder(bus, uuid, c, keystore)
return builder(beatName, bus, uuid, c, keystore)
}
8 changes: 7 additions & 1 deletion libbeat/autodiscover/providers/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ type Provider struct {
}

// AutodiscoverBuilder builds and returns an autodiscover provider
func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore keystore.Keystore) (autodiscover.Provider, error) {
func AutodiscoverBuilder(
beatName string,
bus bus.Bus,
uuid uuid.UUID,
c *common.Config,
keystore keystore.Keystore,
) (autodiscover.Provider, error) {
logger := logp.NewLogger("docker")

errWrap := func(err error) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestDockerStart(t *testing.T) {
s := &template.MapperSettings{nil, nil}
config.Templates = *s
k, _ := keystore.NewFileKeystore("test")
provider, err := AutodiscoverBuilder(bus, UUID, common.MustNewConfigFrom(config), k)
provider, err := AutodiscoverBuilder("mockBeat", bus, UUID, common.MustNewConfigFrom(config), k)
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 7 additions & 1 deletion libbeat/autodiscover/providers/jolokia/jolokia.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ type Provider struct {

// AutodiscoverBuilder builds a Jolokia Discovery autodiscover provider, it fails if
// there is some problem with the configuration
func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore keystore.Keystore) (autodiscover.Provider, error) {
func AutodiscoverBuilder(
beatName string,
bus bus.Bus,
uuid uuid.UUID,
c *common.Config,
keystore keystore.Keystore,
) (autodiscover.Provider, error) {
errWrap := func(err error) error {
return errors.Wrap(err, "error setting up jolokia autodiscover provider")
}
Expand Down
7 changes: 7 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Config struct {
// Scope can be either node or cluster.
Scope string `config:"scope"`
Resource string `config:"resource"`
// Unique identifies if this provider enables its templates only when it is elected as leader in a k8s cluster
Unique bool `config:"unique"`
LeaderLease string `config:"leader_lease"`

Prefix string `config:"prefix"`
Hints *common.Config `config:"hints"`
Expand All @@ -60,6 +63,7 @@ func defaultConfig() *Config {
Resource: "pod",
CleanupTimeout: 60 * time.Second,
Prefix: "co.elastic",
Unique: false,
}
}

Expand Down Expand Up @@ -98,6 +102,9 @@ func (c *Config) Validate() error {
if c.Scope != "node" && c.Scope != "cluster" {
return fmt.Errorf("invalid `scope` configured. supported values are `node` and `cluster`")
}
if c.Unique && c.Scope != "cluster" {
logp.L().Warnf("can only set `unique` when scope is `cluster`")
}

return nil
}
Loading