Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add leader election for autodiscover #20281

Merged
merged 22 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@ rules:
- "/metrics"
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
---
apiVersion: v1
kind: ServiceAccount
Expand Down
6 changes: 6 additions & 0 deletions deploy/kubernetes/metricbeat/metricbeat-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ rules:
- "/metrics"
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ require (
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200701041122-1837592efa10
golang.org/x/tools v0.0.0-20200731060945-b5fad4ed8dd6
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
google.golang.org/grpc v1.29.1
Expand Down
8 changes: 8 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Config struct {
// Scope can be either node or cluster.
Scope string `config:"scope"`
Resource string `config:"resource"`
// Unique identifies if this provider enables its templates only when it is elected as leader in a k8s cluster
Unique bool `config:"unique"`
Identifier string `config:"identifier"`
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved

Prefix string `config:"prefix"`
Hints *common.Config `config:"hints"`
Expand All @@ -60,6 +63,8 @@ func defaultConfig() *Config {
Resource: "pod",
CleanupTimeout: 60 * time.Second,
Prefix: "co.elastic",
Unique: false,
Identifier: "",
}
}

Expand Down Expand Up @@ -98,6 +103,9 @@ func (c *Config) Validate() error {
if c.Scope != "node" && c.Scope != "cluster" {
return fmt.Errorf("invalid `scope` configured. supported values are `node` and `cluster`")
}
if c.Unique && c.Scope != "cluster" {
logp.L().Warnf("can only set `unique` when scope is `cluster`")
}

return nil
}
131 changes: 124 additions & 7 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@
package kubernetes

import (
"context"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/gofrs/uuid"
"github.com/pkg/errors"
Expand All @@ -31,6 +38,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/bus"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/common/kubernetes/k8skeystore"
"github.com/elastic/beats/v7/libbeat/conditions"
"github.com/elastic/beats/v7/libbeat/keystore"
"github.com/elastic/beats/v7/libbeat/logp"
)
Expand All @@ -49,13 +57,15 @@ type Eventer interface {

// Provider implements autodiscover provider for docker containers
type Provider struct {
config *Config
bus bus.Bus
templates template.Mapper
builders autodiscover.Builders
appenders autodiscover.Appenders
logger *logp.Logger
eventer Eventer
config *Config
bus bus.Bus
templates template.Mapper
builders autodiscover.Builders
appenders autodiscover.Appenders
logger *logp.Logger
eventer Eventer
leaderElection leaderelection.LeaderElectionConfig
cancelLeaderElection context.CancelFunc
}

// AutodiscoverBuilder builds and returns an autodiscover provider
Expand All @@ -67,11 +77,17 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore
}

config := defaultConfig()
config.Identifier = uuid.String()
err := c.Unpack(&config)
if err != nil {
return nil, errWrap(err)
}

if config.Unique {
// enrich the config with Unique templates before building Mapper
initUniqueTemplate(config)
}

client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
if err != nil {
return nil, errWrap(err)
Expand Down Expand Up @@ -118,6 +134,10 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore
return nil, errWrap(err)
}

if p.config.Unique {
p.initLeaderElectionConfig(client, uuid.String())
}

return p, nil
}

Expand All @@ -126,11 +146,33 @@ func (p *Provider) Start() {
if err := p.eventer.Start(); err != nil {
p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err)
}

if p.config.Unique {
ctx, cancel := context.WithCancel(context.Background())
p.cancelLeaderElection = cancel
p.StartLeaderElector(ctx, p.leaderElection)
}
}

// StartLeaderElector starts a Leader Elector in the background with the provided config
func (p *Provider) StartLeaderElector(ctx context.Context, lec leaderelection.LeaderElectionConfig) {
le, err := leaderelection.NewLeaderElector(lec)
if err != nil {
p.logger.Errorf("leader election lock GAINED, id %v", err)
}
if lec.WatchDog != nil {
lec.WatchDog.SetLeaderElection(le)
}
p.logger.Debugf("Starting Leader Elector")
go le.Run(ctx)
}

// Stop signals the stop channel to force the watch loop routine to stop.
func (p *Provider) Stop() {
p.eventer.Stop()
if p.cancelLeaderElection != nil {
p.cancelLeaderElection()
}
}

// String returns a description of kubernetes autodiscover provider.
Expand All @@ -154,3 +196,78 @@ func (p *Provider) publish(event bus.Event) {
p.appenders.Append(event)
p.bus.Publish(event)
}

func (p *Provider) startLeading(uuid string, eventID string) {
event := bus.Event{
"start": true,
"provider": uuid,
"id": eventID,
"unique": "true",
}
if config := p.templates.GetConfig(event); config != nil {
event["config"] = config
}
p.bus.Publish(event)
}

func (p *Provider) stopLeading(uuid string, eventID string) {
event := bus.Event{
"stop": true,
"provider": uuid,
"id": eventID,
"unique": "true",
}
if config := p.templates.GetConfig(event); config != nil {
event["config"] = config
}
p.bus.Publish(event)
}

func (p *Provider) initLeaderElectionConfig(client k8s.Interface, uuid string) {
id := "beats-leader-" + p.config.Node
if p.config.Identifier != "" {
id = id + "-" + p.config.Identifier
}
lease := metav1.ObjectMeta{
Name: "beats-cluster-leader",
Namespace: "default",
}
metaUID := lease.GetObjectMeta().GetUID()
p.leaderElection = leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: lease,
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
},
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
p.logger.Debugf("leader election lock GAINED, id %v", id)
eventID := fmt.Sprintf("%v-%v", metaUID, time.Now().UnixNano())
p.startLeading(uuid, eventID)
},
OnStoppedLeading: func() {
p.logger.Debugf("leader election lock LOST, id %v", id)
eventID := fmt.Sprintf("%v-%v", metaUID, time.Now().UnixNano())
p.stopLeading(uuid, eventID)
},
},
}
}

func initUniqueTemplate(config *Config) {
m := make(map[string]interface{})
m["unique"] = "true"
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
fields := &conditions.Fields{}
fields.Unpack(m)
for _, template := range config.Templates {
template.ConditionConfig = &conditions.Config{
Contains: fields,
}
}
}