Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Beta AWS ELB Autodiscovery Provider #8680

Closed
wants to merge 17 commits into from
29 changes: 17 additions & 12 deletions heartbeat/heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary changes while I work on this feature.

# Configure monitors
heartbeat.monitors:
- type: http

# List or urls to query
urls: ["http://localhost:9200"]

# Configure task schedule
schedule: '@every 10s'

# Total test connection and data exchange timeout
#timeout: 16s
heartbeat.autodiscover:
providers:
- type: aws_elb
region: us-east-1
templates:
- condition:
has_fields: ["port"]
config:
- type: tcp
hosts: ["${data.host}:${data.port}"]
schedule: "@every 5s"
timeout: 1s

#==================== Elasticsearch template setting ==========================

Expand Down Expand Up @@ -92,9 +95,11 @@ setup.kibana:
# Configure what output to use when sending the data collected by the beat.

#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
output.console:
pretty: true
#output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:9200"]
# hosts: ["localhost:9200"]

# Optional protocol and basic auth credentials.
#protocol: "https"
Expand Down Expand Up @@ -128,7 +133,7 @@ processors:

# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug
logging.level: info

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
Expand Down
33 changes: 27 additions & 6 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"time"

"github.com/mitchellh/hashstructure"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/autodiscover/meta"
Expand Down Expand Up @@ -163,11 +165,10 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {
logp.Debug(debugK, "Got a start event: %v, generated configs: %+v", event, configs)

meta := getMeta(event)
for _, config := range configs {
hash, err := cfgfile.HashConfig(config)
for configIdx, config := range configs {
hash, err := hashKeyOrConfig(event, configIdx, config)
if err != nil {
logp.Debug(debugK, "Could not hash config %v: %v", config, err)
continue
logp.Error(err)
}

err = a.adapter.CheckConfig(config)
Expand Down Expand Up @@ -204,8 +205,12 @@ func (a *Autodiscover) handleStop(event bus.Event) bool {
}
logp.Debug(debugK, "Got a stop event: %v, generated configs: %+v", event, configs)

for _, config := range configs {
hash, err := cfgfile.HashConfig(config)
for configIdx, config := range configs {
hash, err := hashKeyOrConfig(event, configIdx, config)
if err != nil {
logp.Error(err)
}

if err != nil {
logp.Debug(debugK, "Could not hash config %v: %v", config, err)
continue
Expand All @@ -227,6 +232,22 @@ func (a *Autodiscover) handleStop(event bus.Event) bool {
return updated
}

// Hash using the special "hashKey" key plus the index of the key if available, otherwise hash the given config
func hashKeyOrConfig(event bus.Event, configIdx int, config *common.Config) (hash uint64, err error) {
if hk, ok := event["hashKey"]; ok {
hash, err = hashstructure.Hash(fmt.Sprintf("%s,%d", hk, configIdx), nil)
if err != nil {
errors.Wrapf(err, "Could not hash key %s:", hk)
}
} else {
hash, err = cfgfile.HashConfig(config)
if err != nil {
errors.Wrapf(err, "Could not hash config %v", config)
}
}
return hash, err
}

func getMeta(event bus.Event) common.MapStr {
m := event["meta"]
if m == nil {
Expand Down
1 change: 1 addition & 0 deletions libbeat/autodiscover/providers/aws/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package aws
40 changes: 40 additions & 0 deletions libbeat/autodiscover/providers/aws/elb/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package elb

import (
"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
)

// Config for docker autodiscover provider
type Config struct {
Region string `config:"region" validate:"required"`
Type string `config:"type"`
HintsEnabled bool `config:"hints.enabled"`
Builders []*common.Config `config:"builders"`
Appenders []*common.Config `config:"appenders"`
Templates template.MapperSettings `config:"templates"`
}

func defaultConfig() *Config {
return &Config{}
}

func (c *Config) Validate() {
}
132 changes: 132 additions & 0 deletions libbeat/autodiscover/providers/aws/elb/fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package elb

import (
"sync"

"github.com/aws/aws-sdk-go-v2/service/elbv2"
"go.uber.org/multierr"

"github.com/elastic/beats/libbeat/common/atomic"
)

type Fetcher interface {
fetch() ([]*lbListener, error)
}

type APIFetcher struct {
client *elbv2.ELBV2
}

func NewAPIFetcher(client *elbv2.ELBV2) Fetcher {
return &APIFetcher{client}
}

func (f *APIFetcher) fetch() ([]*lbListener, error) {
var pageSize int64 = 50
req := f.client.DescribeLoadBalancersRequest(&elbv2.DescribeLoadBalancersInput{PageSize: &pageSize})

// Limit concurrency against the AWS API to 5
taskPool := sync.Pool{}
for i := 0; i < 5; i++ {
taskPool.Put(nil)
}

ir := &fetchRequest{
req.Paginate(),
f.client,
atomic.MakeBool(true),
[]*lbListener{},
[]error{},
sync.Mutex{},
taskPool,
sync.WaitGroup{},
}

return ir.fetch()
}

type fetchRequest struct {
paginator elbv2.DescribeLoadBalancersPager
client *elbv2.ELBV2
running atomic.Bool
lbListeners []*lbListener
errs []error
resultsLock sync.Mutex
taskPool sync.Pool
pendingTasks sync.WaitGroup
}

func (p *fetchRequest) fetch() ([]*lbListener, error) {
p.dispatch(p.fetchNextPage)

p.pendingTasks.Wait()

// Acquire the results lock to ensure memory
// consistency between the last write and this read
p.resultsLock.Lock()
defer p.resultsLock.Unlock()

if len(p.errs) > 0 {
return nil, multierr.Combine(p.errs...)
}

return p.lbListeners, nil
}

func (p *fetchRequest) fetchNextPage() {
if !p.running.Load() {
return
}

if p.paginator.Next() {
for _, lb := range p.paginator.CurrentPage().LoadBalancers {
p.dispatch(func() { p.fetchListeners(lb) })
}
}

if p.paginator.Err() != nil {
p.recordErrResult(p.paginator.Err())
}
}

func (p *fetchRequest) dispatch(fn func()) {
p.pendingTasks.Add(1)

go func() {
slot := p.taskPool.Get()
defer p.taskPool.Put(slot)
defer p.pendingTasks.Done()

fn()
}()
}

func (p *fetchRequest) recordGoodResult(lb *elbv2.LoadBalancer, lbl *elbv2.Listener) {
p.resultsLock.Lock()
defer p.resultsLock.Unlock()

p.lbListeners = append(p.lbListeners, &lbListener{lb, lbl})
}

func (p *fetchRequest) recordErrResult(err error) {
p.resultsLock.Lock()
defer p.resultsLock.Unlock()

p.errs = append(p.errs, err)

// Try to stop execution early
p.running.Store(false)
}

func (p *fetchRequest) fetchListeners(lb elbv2.LoadBalancer) {
listenReq := p.client.DescribeListenersRequest(&elbv2.DescribeListenersInput{LoadBalancerArn: lb.LoadBalancerArn})
listen := listenReq.Paginate()
for listen.Next() && p.running.Load() {
for _, listener := range listen.CurrentPage().Listeners {
p.recordGoodResult(&lb, &listener)
}
}
if listen.Err() != nil {
p.recordErrResult(listen.Err())
}
}
61 changes: 61 additions & 0 deletions libbeat/autodiscover/providers/aws/elb/lblistener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package elb

import (
"fmt"

"github.com/aws/aws-sdk-go-v2/service/elbv2"

"github.com/elastic/beats/libbeat/common"
)

func lblToMap(l *elbv2.LoadBalancer, listener *elbv2.Listener) {

}

type lbListener struct {
lb *elbv2.LoadBalancer
listener *elbv2.Listener
}

func (l *lbListener) toMap() common.MapStr {
m := common.MapStr{}

m["host"] = *l.lb.DNSName
m["port"] = *l.listener.Port
m["type"] = string(l.lb.Type)
m["scheme"] = l.lb.Scheme
m["availability_zones"] = l.azStrings()
m["created"] = l.lb.CreatedTime
m["state"] = l.stateMap()
m["load_balancer_arn"] = *l.lb.LoadBalancerArn
m["ip_address_type"] = string(l.lb.IpAddressType)
m["security_groups"] = l.lb.SecurityGroups
m["vpc_id"] = *l.lb.VpcId
m["protocol"] = l.listener.Protocol
m["ssl_policy"] = l.listener.SslPolicy

return m
}

func (l *lbListener) uuid() string {
return fmt.Sprintf("%s|%s", *l.lb.LoadBalancerArn, *l.listener.ListenerArn)
}

func (l *lbListener) azStrings() []string {
azs := l.lb.AvailabilityZones
res := make([]string, 0, len(azs))
for _, az := range azs {
res = append(res, *az.ZoneName)
}
return res
}

func (l *lbListener) stateMap() (stateMap common.MapStr) {
state := l.lb.State
stateMap = common.MapStr{}
if state.Reason != nil {
stateMap["reason"] = *state.Reason
}
stateMap["code"] = state.Code
return stateMap
}
Loading