Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add autodiscover for aws_ec2 #14823

Merged
merged 29 commits into from
Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3506757
Add autodiscover for aws_ec2
kaiyan-sheng Nov 27, 2019
07469cd
Add aws_ec2 autodiscover
kaiyan-sheng Nov 27, 2019
dfc6269
test with logp.Debug
kaiyan-sheng Nov 27, 2019
4d350ea
Merge remote-tracking branch 'upstream/master' into autodiscover_aws_ec2
kaiyan-sheng Dec 6, 2019
e5899f8
Add comment to DefaultConfig function
kaiyan-sheng Dec 6, 2019
fcdba2f
Merge remote-tracking branch 'upstream/master' into autodiscover_aws_ec2
kaiyan-sheng Dec 17, 2019
c042a08
Change ec2 metadata mapping
kaiyan-sheng Dec 17, 2019
5662544
Update changelog
kaiyan-sheng Dec 17, 2019
a4e7f4f
Update changelog
kaiyan-sheng Dec 17, 2019
580c13a
change instanceId to instanceID
kaiyan-sheng Dec 17, 2019
f06521d
Collect from all regions if regions is not specified
kaiyan-sheng Dec 18, 2019
3a5fa25
Add GetRegions into aws_elb provider
kaiyan-sheng Dec 18, 2019
fef6fde
Add aws_ec2 provider into autodiscover doc
kaiyan-sheng Dec 19, 2019
543e18a
update autodiscover doc
kaiyan-sheng Dec 19, 2019
4605f1f
try to fix doc
kaiyan-sheng Dec 19, 2019
556830a
Add unit tests
kaiyan-sheng Dec 19, 2019
0f7afa6
add comments
kaiyan-sheng Dec 19, 2019
6af9ca2
Fix metricbeat doc
kaiyan-sheng Dec 19, 2019
672c42b
Make houndci happy
kaiyan-sheng Dec 19, 2019
cd459b4
Merge remote-tracking branch 'upstream/master' into autodiscover_aws_ec2
kaiyan-sheng Jan 3, 2020
397dd24
Merge remote-tracking branch 'upstream/master' into autodiscover_aws_ec2
kaiyan-sheng Jan 8, 2020
6d13377
Merge remote-tracking branch 'upstream/master' into autodiscover_aws_ec2
kaiyan-sheng Jan 24, 2020
780a34a
Merge remote-tracking branch 'upstream/master' into autodiscover_aws_ec2
kaiyan-sheng Feb 3, 2020
8e70d89
update changelog
kaiyan-sheng Feb 3, 2020
eb0d83f
update changelog
kaiyan-sheng Feb 3, 2020
6505d1a
Merge remote-tracking branch 'upstream/master' into autodiscover_aws_ec2
kaiyan-sheng Feb 5, 2020
48f121c
Add aws prefix to meta ec2
kaiyan-sheng Feb 6, 2020
656379e
update field names from ec2 to aws.ec2
kaiyan-sheng Feb 6, 2020
286d2ad
Add aws.ec2.* to autodiscover template
kaiyan-sheng Feb 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions x-pack/libbeat/autodiscover/providers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,15 @@
// you may not use this file except in compliance with the Elastic License.

package aws

// SafeStrp makes handling AWS *string types easier.
// The AWS lib never returns plain strings, always using pointers, probably for memory efficiency reasons.
// This is a bit odd, because strings are just pointers into byte arrays, however this is the choice they've made.
// This will return the plain version of the given string or an empty string if the pointer is null
func SafeStrp(strp *string) string {
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
if strp == nil {
return ""
}

return *strp
}
41 changes: 41 additions & 0 deletions x-pack/libbeat/autodiscover/providers/aws/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package aws

import (
"time"

"github.com/elastic/beats/x-pack/libbeat/common/aws"

"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
)

// Config for all aws autodiscover providers.
type Config struct {
Type string `config:"type"`

// Standard autodiscover fields.

// Hints are currently not supported, but may be implemented in a later release
HintsEnabled bool `config:"hints.enabled"`
Builders []*common.Config `config:"builders"`
Appenders []*common.Config `config:"appenders"`
Templates template.MapperSettings `config:"templates"`
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved

// Period defines how often to poll the AWS API.
Period time.Duration `config:"period" validate:"nonzero,required"`

// AWS Specific autodiscover fields
Regions []string `config:"regions" validate:"required"`
AWSConfig aws.ConfigAWS `config:",inline"`
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
}

// DefaultConfig for all aws autodiscover providers.
func DefaultConfig() *Config {
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
return &Config{
Period: time.Minute,
}
}
11 changes: 11 additions & 0 deletions x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
- key: ec2_listener
title: "EC2 Listener"
description: >
AWS EC2 Listeners
short_config: false
release: experimental
fields:
- name: ec2_listener
type: group
description: >
Represents an AWS EC2 Listener, e.g. state of an EC2.
83 changes: 83 additions & 0 deletions x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package ec2

import (
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws"
)

type ec2Instance struct {
ec2Instance ec2.Instance
}

// toMap converts this ec2Instance into the form consumed as metadata in the autodiscovery process.
func (i *ec2Instance) toMap() common.MapStr {
instanceType, err := i.ec2Instance.InstanceType.MarshalValue()
if err != nil {
logp.Error(errors.Wrap(err, "MarshalValue failed for instance type: "))
}

monitoringState, err := i.ec2Instance.Monitoring.State.MarshalValue()
if err != nil {
logp.Error(errors.Wrap(err, "MarshalValue failed for monitoring state: "))
}

architecture, err := i.ec2Instance.Architecture.MarshalValue()
if err != nil {
logp.Error(errors.Wrap(err, "MarshalValue failed for architecture: "))
}

m := common.MapStr{
"instance_id": awsauto.SafeStrp(i.ec2Instance.InstanceId),
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
"image_id": awsauto.SafeStrp(i.ec2Instance.ImageId),
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
"vpc_id": awsauto.SafeStrp(i.ec2Instance.VpcId),
"subnet_id": awsauto.SafeStrp(i.ec2Instance.SubnetId),
"type": instanceType,
"private_ip": awsauto.SafeStrp(i.ec2Instance.PrivateIpAddress),
"private_dns_name": awsauto.SafeStrp(i.ec2Instance.PrivateDnsName),
"public_ip": awsauto.SafeStrp(i.ec2Instance.PublicIpAddress),
"public_dns_name": awsauto.SafeStrp(i.ec2Instance.PublicDnsName),
"monitoring_state": monitoringState,
"architecture": architecture,
"root_device_name": awsauto.SafeStrp(i.ec2Instance.RootDeviceName),
"kernel_id": awsauto.SafeStrp(i.ec2Instance.KernelId),
"state": i.stateMap(),
}

for _, tag := range i.ec2Instance.Tags {
m.Put("tags."+awsauto.SafeStrp(tag.Key), awsauto.SafeStrp(tag.Value))
}
return m
}

func (i *ec2Instance) toCloudMap() common.MapStr {
m := common.MapStr{}
availabilityZone := awsauto.SafeStrp(i.ec2Instance.Placement.AvailabilityZone)
m["availability_zone"] = availabilityZone
m["provider"] = "aws"

// The region is just an AZ with the last character removed
m["region"] = availabilityZone[:len(availabilityZone)-1]
return m
}

// stateMap converts the State part of the ec2 struct into a friendlier map with 'reason' and 'code' fields.
func (i *ec2Instance) stateMap() (stateMap common.MapStr) {
state := i.ec2Instance.State
stateMap = common.MapStr{}
nameString, err := state.Name.MarshalValue()
if err != nil {
logp.Error(errors.Wrap(err, "MarshalValue failed for instance state name: "))
}

stateMap["name"] = nameString
stateMap["code"] = state.Code
return stateMap
}
230 changes: 230 additions & 0 deletions x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package ec2

import (
"context"
"sync"

"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface"
"go.uber.org/multierr"

"github.com/elastic/beats/libbeat/logp"
awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws"
)

const logSelector = "autodiscover-ec2-fetch"

// fetcher is an interface that can fetch a list of ec2Instance objects without pagination being necessary.
type fetcher interface {
fetch(ctx context.Context) ([]*ec2Instance, error)
}

// apiMultiFetcher fetches results from multiple clients concatenating their results together
// Useful since we have a fetcher per region, this combines them.
type apiMultiFetcher struct {
fetchers []fetcher
}

func (amf *apiMultiFetcher) fetch(ctx context.Context) ([]*ec2Instance, error) {
fetchResults := make(chan []*ec2Instance)
fetchErr := make(chan error)

// Simultaneously fetch all from each region
for _, f := range amf.fetchers {
go func(f fetcher) {
res, err := f.fetch(ctx)
if err != nil {
fetchErr <- err
} else {
fetchResults <- res
}
}(f)
}

var results []*ec2Instance
var errs []error

for pending := len(amf.fetchers); pending > 0; pending-- {
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
select {
case r := <-fetchResults:
results = append(results, r...)
case e := <-fetchErr:
errs = append(errs, e)
}
}

return results, multierr.Combine(errs...)
}

// apiFetcher is a concrete implementation of fetcher that hits the real AWS API.
type apiFetcher struct {
client ec2iface.ClientAPI
}

func newAPIFetcher(clients []ec2iface.ClientAPI) fetcher {
fetchers := make([]fetcher, len(clients))
for idx, client := range clients {
fetchers[idx] = &apiFetcher{client}
}
return &apiMultiFetcher{fetchers}
}

// fetch attempts to request the full list of ec2Instance objects.
// It accomplishes this by fetching a page of load balancers, then one go routine
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
// per listener API request. Each page of results has O(n)+1 perf since we need that
// additional fetch per EC2. We let the goroutine scheduler sort things out, and use
// a sync.Pool to limit the number of in-flight requests.
func (f *apiFetcher) fetch(ctx context.Context) ([]*ec2Instance, error) {
var MaxResults int64 = 50

describeInstanceInput := &ec2.DescribeInstancesInput{MaxResults: &MaxResults}
req := f.client.DescribeInstancesRequest(describeInstanceInput)

ctx, cancel := context.WithCancel(ctx)
ir := &fetchRequest{
paginator: ec2.NewDescribeInstancesPaginator(req),
client: f.client,
taskPool: sync.Pool{},
context: ctx,
cancel: cancel,
}

// Limit concurrency against the AWS API by creating a pool of objects
// This is hard coded for now. The concurrency limit of 10 was set semi-arbitrarily.
for i := 0; i < 10; i++ {
ir.taskPool.Put(nil)
}

return ir.fetch()
}

// fetchRequest provides a way to get all pages from a
// ec2.DescribeInstancesPaginator and all listeners for the given EC2 instance.
type fetchRequest struct {
paginator ec2.DescribeInstancesPaginator
client ec2iface.ClientAPI
ec2Instances []*ec2Instance
errs []error
resultsLock sync.Mutex
taskPool sync.Pool
pendingTasks sync.WaitGroup
context context.Context
cancel func()
}

func (p *fetchRequest) fetch() ([]*ec2Instance, error) {
p.dispatch(p.fetchAllPages)

// Only fetch future pages when there are no longer requests in-flight from a previous page
p.pendingTasks.Wait()

// Acquire the results lock to ensure memory
// consistency between the last write and this read
p.resultsLock.Lock()
defer p.resultsLock.Unlock()

// Since everything is async we have to retrieve any errors that occurred from here
if len(p.errs) > 0 {
return nil, multierr.Combine(p.errs...)
}

return p.ec2Instances, nil
}

func (p *fetchRequest) fetchAllPages() {
// Keep fetching pages unless we're stopped OR there are no pages left
for {
select {
case <-p.context.Done():
logp.Debug(logSelector, "done fetching EC2 instances, context cancelled")
return
default:
if !p.fetchNextPage() {
logp.Debug(logSelector, "fetched all EC2 instances")
return
}
logp.Debug(logSelector, "fetched EC2 instance")
}
}
}

func (p *fetchRequest) fetchNextPage() (more bool) {
success := p.paginator.Next(p.context)

if success {
for _, reservation := range p.paginator.CurrentPage().Reservations {
for _, instance := range reservation.Instances {
p.dispatch(func() { p.fetchInstances(instance) })
}
}
}

if p.paginator.Err() != nil {
p.recordErrResult(p.paginator.Err())
}

return success
}

// dispatch runs the given func in a new goroutine, properly throttling requests
// with the taskPool and also managing the pendingTasks waitGroup to ensure all
// results are accumulated.
func (p *fetchRequest) dispatch(fn func()) {
p.pendingTasks.Add(1)

go func() {
slot := p.taskPool.Get()
defer p.taskPool.Put(slot)
defer p.pendingTasks.Done()

fn()
}()
}

func (p *fetchRequest) fetchInstances(instance ec2.Instance) {
describeInstancesInput := &ec2.DescribeInstancesInput{InstanceIds: []string{awsauto.SafeStrp(instance.InstanceId)}}
req := p.client.DescribeInstancesRequest(describeInstancesInput)
listen := ec2.NewDescribeInstancesPaginator(req)

if listen.Err() != nil {
p.recordErrResult(listen.Err())
}

for {
select {
case <-p.context.Done():
return
default:
if !listen.Next(p.context) {
return
}

for _, reservation := range listen.CurrentPage().Reservations {
for _, instance := range reservation.Instances {
p.recordGoodResult(instance)
}
}
}

}
}

func (p *fetchRequest) recordGoodResult(instance ec2.Instance) {
p.resultsLock.Lock()
defer p.resultsLock.Unlock()

p.ec2Instances = append(p.ec2Instances, &ec2Instance{instance})
}

func (p *fetchRequest) recordErrResult(err error) {
p.resultsLock.Lock()
defer p.resultsLock.Unlock()

p.errs = append(p.errs, err)

p.cancel()
}
Loading