From 35067574fc3f1850e58808afdaa571783a19d144 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 27 Nov 2019 07:54:15 -0700 Subject: [PATCH 01/22] Add autodiscover for aws_ec2 --- .../autodiscover/providers/aws/config.go | 41 ++++++++ .../providers/aws/ec2/_meta/fields.yml | 0 .../autodiscover/providers/aws/ec2/ec2.go | 98 +++++++++++++++++++ .../autodiscover/providers/aws/ec2/fetch.go | 1 + .../providers/aws/ec2/provider.go | 1 + .../autodiscover/providers/aws/ec2/watch.go | 1 + 6 files changed, 142 insertions(+) create mode 100644 x-pack/libbeat/autodiscover/providers/aws/config.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go diff --git a/x-pack/libbeat/autodiscover/providers/aws/config.go b/x-pack/libbeat/autodiscover/providers/aws/config.go new file mode 100644 index 00000000000..77866cfdb26 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/config.go @@ -0,0 +1,41 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "time" + + "github.com/elastic/beats/x-pack/libbeat/common/aws" + + "github.com/elastic/beats/libbeat/autodiscover/template" + "github.com/elastic/beats/libbeat/common" +) + +// Config for the aws_ec2 autodiscover provider. +type Config struct { + Type string `config:"type"` + + // Standard autodiscover fields. + + // Hints are currently not supported, but may be implemented in a later release + HintsEnabled bool `config:"hints.enabled"` + Builders []*common.Config `config:"builders"` + Appenders []*common.Config `config:"appenders"` + Templates template.MapperSettings `config:"templates"` + + // Period defines how often to poll the AWS API. + Period time.Duration `config:"period" validate:"nonzero,required"` + + // AWS Specific autodiscover fields + + Regions []string `config:"regions" validate:"required"` + AWSConfig aws.ConfigAWS `config:",inline"` +} + +func defaultConfig() *Config { + return &Config{ + Period: time.Minute, + } +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml b/x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go new file mode 100644 index 00000000000..aa642d762fc --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go @@ -0,0 +1,98 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" + + "github.com/elastic/beats/libbeat/common" +) + +// lbListener is a tuple type representing an elasticloadbalancingv2.Listener and its associated elasticloadbalancingv2.LoadBalancer. +type lbListener struct { + lb *elasticloadbalancingv2.LoadBalancer + listener *elasticloadbalancingv2.Listener +} + +// toMap converts this lbListener into the form consumed as metadata in the autodiscovery process. +func (l *lbListener) toMap() common.MapStr { + // We fully spell out listener_arn to avoid confusion with the ARN for the whole ELB + m := common.MapStr{ + "listener_arn": l.listener.ListenerArn, + "load_balancer_arn": safeStrp(l.lb.LoadBalancerArn), + "host": safeStrp(l.lb.DNSName), + "protocol": l.listener.Protocol, + "type": string(l.lb.Type), + "scheme": l.lb.Scheme, + "availability_zones": l.azStrings(), + "created": l.lb.CreatedTime, + "state": l.stateMap(), + "ip_address_type": string(l.lb.IpAddressType), + "security_groups": l.lb.SecurityGroups, + "vpc_id": safeStrp(l.lb.VpcId), + "ssl_policy": l.listener.SslPolicy, + } + + if l.listener.Port != nil { + m["port"] = *l.listener.Port + } + + return m +} + +// safeStrp makes handling AWS *string types easier. +// The AWS lib never returns plain strings, always using pointers, probably for memory efficiency reasons. +// This is a bit odd, because strings are just pointers into byte arrays, however this is the choice they've made. +// This will return the plain version of the given string or an empty string if the pointer is null +func safeStrp(strp *string) string { + if strp == nil { + return "" + } + + return *strp +} + +func (l *lbListener) toCloudMap() common.MapStr { + m := common.MapStr{} + + var azs []string + for _, az := range l.lb.AvailabilityZones { + azs = append(azs, *az.ZoneName) + } + m["availability_zone"] = azs + m["provider"] = "aws" + + // The region is just an AZ with the last character removed + firstAz := azs[0] + m["region"] = firstAz[:len(firstAz)-2] + + return m +} + +// arn returns a globally unique ID. In the case of an lbListener, that would be its listenerArn. +func (l *lbListener) arn() string { + return *l.listener.ListenerArn +} + +// azStrings transforms the weird list of availability zone string pointers to a slice of plain strings. +func (l *lbListener) azStrings() []string { + azs := l.lb.AvailabilityZones + res := make([]string, 0, len(azs)) + for _, az := range azs { + res = append(res, *az.ZoneName) + } + return res +} + +// stateMap converts the State part of the lb struct into a friendlier map with 'reason' and 'code' fields. +func (l *lbListener) stateMap() (stateMap common.MapStr) { + state := l.lb.State + stateMap = common.MapStr{} + if state.Reason != nil { + stateMap["reason"] = *state.Reason + } + stateMap["code"] = state.Code + return stateMap +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go new file mode 100644 index 00000000000..e4673404274 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go @@ -0,0 +1 @@ +package ec2 diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go new file mode 100644 index 00000000000..e4673404274 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go @@ -0,0 +1 @@ +package ec2 diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go new file mode 100644 index 00000000000..e4673404274 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go @@ -0,0 +1 @@ +package ec2 From 07469cd545ddb8713eb6bf466000d8246eefdf0d Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 27 Nov 2019 09:45:40 -0700 Subject: [PATCH 02/22] Add aws_ec2 autodiscover --- .../libbeat/autodiscover/providers/aws/aws.go | 12 + .../autodiscover/providers/aws/config.go | 4 +- .../providers/aws/ec2/_meta/fields.yml | 11 + .../autodiscover/providers/aws/ec2/ec2.go | 144 ++++++----- .../autodiscover/providers/aws/ec2/fetch.go | 229 ++++++++++++++++++ .../providers/aws/ec2/provider.go | 148 +++++++++++ .../autodiscover/providers/aws/ec2/watch.go | 104 ++++++++ x-pack/libbeat/cmd/inject.go | 1 + 8 files changed, 588 insertions(+), 65 deletions(-) diff --git a/x-pack/libbeat/autodiscover/providers/aws/aws.go b/x-pack/libbeat/autodiscover/providers/aws/aws.go index 34e92c6addb..240ecb988ae 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/aws.go +++ b/x-pack/libbeat/autodiscover/providers/aws/aws.go @@ -3,3 +3,15 @@ // you may not use this file except in compliance with the Elastic License. package aws + +// SafeStrp makes handling AWS *string types easier. +// The AWS lib never returns plain strings, always using pointers, probably for memory efficiency reasons. +// This is a bit odd, because strings are just pointers into byte arrays, however this is the choice they've made. +// This will return the plain version of the given string or an empty string if the pointer is null +func SafeStrp(strp *string) string { + if strp == nil { + return "" + } + + return *strp +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/config.go b/x-pack/libbeat/autodiscover/providers/aws/config.go index 77866cfdb26..ce712c821b9 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/config.go +++ b/x-pack/libbeat/autodiscover/providers/aws/config.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package ec2 +package aws import ( "time" @@ -34,7 +34,7 @@ type Config struct { AWSConfig aws.ConfigAWS `config:",inline"` } -func defaultConfig() *Config { +func DefaultConfig() *Config { return &Config{ Period: time.Minute, } diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml b/x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml index e69de29bb2d..d634a6697df 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml @@ -0,0 +1,11 @@ +- key: ec2_listener + title: "EC2 Listener" + description: > + AWS EC2 Listeners + short_config: false + release: experimental + fields: + - name: ec2_listener + type: group + description: > + Represents an AWS EC2 Listener, e.g. state of an EC2. diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go index aa642d762fc..2e9692dbce0 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go @@ -5,94 +5,112 @@ package ec2 import ( - "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" + "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" ) -// lbListener is a tuple type representing an elasticloadbalancingv2.Listener and its associated elasticloadbalancingv2.LoadBalancer. -type lbListener struct { - lb *elasticloadbalancingv2.LoadBalancer - listener *elasticloadbalancingv2.Listener +type ec2Instance struct { + ec2Instance ec2.Instance + logger *logp.Logger } -// toMap converts this lbListener into the form consumed as metadata in the autodiscovery process. -func (l *lbListener) toMap() common.MapStr { - // We fully spell out listener_arn to avoid confusion with the ARN for the whole ELB - m := common.MapStr{ - "listener_arn": l.listener.ListenerArn, - "load_balancer_arn": safeStrp(l.lb.LoadBalancerArn), - "host": safeStrp(l.lb.DNSName), - "protocol": l.listener.Protocol, - "type": string(l.lb.Type), - "scheme": l.lb.Scheme, - "availability_zones": l.azStrings(), - "created": l.lb.CreatedTime, - "state": l.stateMap(), - "ip_address_type": string(l.lb.IpAddressType), - "security_groups": l.lb.SecurityGroups, - "vpc_id": safeStrp(l.lb.VpcId), - "ssl_policy": l.listener.SslPolicy, +// toMap converts this ec2Instance into the form consumed as metadata in the autodiscovery process. +func (i *ec2Instance) toMap() common.MapStr { + instanceType, err := i.ec2Instance.InstanceType.MarshalValue() + if err != nil { + i.logger.Error("MarshalValue failed for instance type: ", err) } - if l.listener.Port != nil { - m["port"] = *l.listener.Port + monitoringState, err := i.ec2Instance.Monitoring.State.MarshalValue() + if err != nil { + i.logger.Error("MarshalValue failed for monitoring state: ", err) } - return m -} - -// safeStrp makes handling AWS *string types easier. -// The AWS lib never returns plain strings, always using pointers, probably for memory efficiency reasons. -// This is a bit odd, because strings are just pointers into byte arrays, however this is the choice they've made. -// This will return the plain version of the given string or an empty string if the pointer is null -func safeStrp(strp *string) string { - if strp == nil { - return "" + architecture, err := i.ec2Instance.Architecture.MarshalValue() + if err != nil { + i.logger.Error("MarshalValue failed for architecture: ", err) } - return *strp + m := common.MapStr{ + "image_id": awsauto.SafeStrp(i.ec2Instance.ImageId), + "vpc_id": awsauto.SafeStrp(i.ec2Instance.VpcId), + "subnet_id": awsauto.SafeStrp(i.ec2Instance.SubnetId), + "host_id": awsauto.SafeStrp(i.ec2Instance.Placement.HostId), + "group_name": awsauto.SafeStrp(i.ec2Instance.Placement.GroupName), + "arn": awsauto.SafeStrp(i.ec2Instance.IamInstanceProfile.Arn), + "instance_id": awsauto.SafeStrp(i.ec2Instance.IamInstanceProfile.Id), + "type": instanceType, + "private_ip": awsauto.SafeStrp(i.ec2Instance.PrivateIpAddress), + "private_dns_name": awsauto.SafeStrp(i.ec2Instance.PrivateDnsName), + "public_ip": awsauto.SafeStrp(i.ec2Instance.PublicIpAddress), + "public_dns_name": awsauto.SafeStrp(i.ec2Instance.PublicDnsName), + "monitoring_state": monitoringState, + "architecture": architecture, + "root_device_name": awsauto.SafeStrp(i.ec2Instance.RootDeviceName), + "kernel_id": awsauto.SafeStrp(i.ec2Instance.KernelId), + "state": i.stateMap(), + "state_reason": i.stateReasonMap(), + "tags": i.tagMap(), + } + return m } -func (l *lbListener) toCloudMap() common.MapStr { +func (i *ec2Instance) toCloudMap() common.MapStr { m := common.MapStr{} - - var azs []string - for _, az := range l.lb.AvailabilityZones { - azs = append(azs, *az.ZoneName) - } - m["availability_zone"] = azs + availabilityZone := awsauto.SafeStrp(i.ec2Instance.Placement.AvailabilityZone) + m["availability_zone"] = availabilityZone m["provider"] = "aws" // The region is just an AZ with the last character removed - firstAz := azs[0] - m["region"] = firstAz[:len(firstAz)-2] - + m["region"] = availabilityZone[:len(availabilityZone)-2] return m } -// arn returns a globally unique ID. In the case of an lbListener, that would be its listenerArn. -func (l *lbListener) arn() string { - return *l.listener.ListenerArn +// arn returns a globally unique ID. In the case of an ec2Instance, that would be its listenerArn. +func (i *ec2Instance) arn() string { + return awsauto.SafeStrp(i.ec2Instance.IamInstanceProfile.Arn) } -// azStrings transforms the weird list of availability zone string pointers to a slice of plain strings. -func (l *lbListener) azStrings() []string { - azs := l.lb.AvailabilityZones - res := make([]string, 0, len(azs)) - for _, az := range azs { - res = append(res, *az.ZoneName) - } - return res -} - -// stateMap converts the State part of the lb struct into a friendlier map with 'reason' and 'code' fields. -func (l *lbListener) stateMap() (stateMap common.MapStr) { - state := l.lb.State +// stateMap converts the State part of the ec2 struct into a friendlier map with 'reason' and 'code' fields. +func (i *ec2Instance) stateMap() (stateMap common.MapStr) { + state := i.ec2Instance.State stateMap = common.MapStr{} - if state.Reason != nil { - stateMap["reason"] = *state.Reason + nameString, err := state.Name.MarshalValue() + if err != nil { + i.logger.Error("MarshalValue failed for instance state name: ", err) } + + stateMap["name"] = nameString stateMap["code"] = state.Code + + stateReason := i.ec2Instance.StateReason + stateMap["state_reason"] = awsauto.SafeStrp(stateReason.Code) + stateMap[""] = awsauto.SafeStrp(stateReason.Message) return stateMap } + +// stateReasonMap converts the State Reason part of the ec2 struct into a friendlier map with 'reason' and 'code' fields. +func (i *ec2Instance) stateReasonMap() (stateReasonMap common.MapStr) { + stateReasonMap = common.MapStr{} + stateReason := i.ec2Instance.StateReason + stateReasonMap["code"] = awsauto.SafeStrp(stateReason.Code) + stateReasonMap["message"] = awsauto.SafeStrp(stateReason.Message) + return stateReasonMap +} + +// stateMap converts the State part of the ec2 struct into a friendlier map with 'reason' and 'code' fields. +func (i *ec2Instance) tagMap() (tagsMap []common.MapStr) { + tags := i.ec2Instance.Tags + tagsMap = []common.MapStr{} + tagPair := common.MapStr{} + for _, tag := range tags { + tagPair["key"] = awsauto.SafeStrp(tag.Key) + tagPair["value"] = awsauto.SafeStrp(tag.Value) + tagsMap = append(tagsMap, tagPair) + } + + return tagsMap +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go index e4673404274..60ae04fcee2 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go @@ -1 +1,230 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package ec2 + +import ( + "context" + "sync" + + "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" + "go.uber.org/multierr" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" + "github.com/elastic/beats/libbeat/logp" +) + +const logSelector = "autodiscover-ec2-fetch" + +// fetcher is an interface that can fetch a list of ec2Instance objects without pagination being necessary. +type fetcher interface { + fetch(ctx context.Context) ([]*ec2Instance, error) +} + +// apiMultiFetcher fetches results from multiple clients concatenating their results together +// Useful since we have a fetcher per region, this combines them. +type apiMultiFetcher struct { + fetchers []fetcher +} + +func (amf *apiMultiFetcher) fetch(ctx context.Context) ([]*ec2Instance, error) { + fetchResults := make(chan []*ec2Instance) + fetchErr := make(chan error) + + // Simultaneously fetch all from each region + for _, f := range amf.fetchers { + go func(f fetcher) { + res, err := f.fetch(ctx) + if err != nil { + fetchErr <- err + } else { + fetchResults <- res + } + }(f) + } + + var results []*ec2Instance + var errs []error + + for pending := len(amf.fetchers); pending > 0; pending-- { + select { + case r := <-fetchResults: + results = append(results, r...) + case e := <-fetchErr: + errs = append(errs, e) + } + } + + return results, multierr.Combine(errs...) +} + +// apiFetcher is a concrete implementation of fetcher that hits the real AWS API. +type apiFetcher struct { + client ec2iface.ClientAPI +} + +func newAPIFetcher(clients []ec2iface.ClientAPI) fetcher { + fetchers := make([]fetcher, len(clients)) + for idx, client := range clients { + fetchers[idx] = &apiFetcher{client} + } + return &apiMultiFetcher{fetchers} +} + +// fetch attempts to request the full list of ec2Instance objects. +// It accomplishes this by fetching a page of load balancers, then one go routine +// per listener API request. Each page of results has O(n)+1 perf since we need that +// additional fetch per EC2. We let the goroutine scheduler sort things out, and use +// a sync.Pool to limit the number of in-flight requests. +func (f *apiFetcher) fetch(ctx context.Context) ([]*ec2Instance, error) { + var MaxResults int64 = 50 + + describeInstanceInput := &ec2.DescribeInstancesInput{MaxResults: &MaxResults} + req := f.client.DescribeInstancesRequest(describeInstanceInput) + + ctx, cancel := context.WithCancel(ctx) + ir := &fetchRequest{ + paginator: ec2.NewDescribeInstancesPaginator(req), + client: f.client, + taskPool: sync.Pool{}, + context: ctx, + cancel: cancel, + } + + // Limit concurrency against the AWS API by creating a pool of objects + // This is hard coded for now. The concurrency limit of 10 was set semi-arbitrarily. + for i := 0; i < 10; i++ { + ir.taskPool.Put(nil) + } + + return ir.fetch() +} + +// fetchRequest provides a way to get all pages from a +// ec2.DescribeInstancesPaginator and all listeners for the given EC2 instance. +type fetchRequest struct { + paginator ec2.DescribeInstancesPaginator + client ec2iface.ClientAPI + ec2Instances []*ec2Instance + errs []error + resultsLock sync.Mutex + taskPool sync.Pool + pendingTasks sync.WaitGroup + context context.Context + cancel func() + logger *logp.Logger +} + +func (p *fetchRequest) fetch() ([]*ec2Instance, error) { + p.dispatch(p.fetchAllPages) + + // Only fetch future pages when there are no longer requests in-flight from a previous page + p.pendingTasks.Wait() + + // Acquire the results lock to ensure memory + // consistency between the last write and this read + p.resultsLock.Lock() + defer p.resultsLock.Unlock() + + // Since everything is async we have to retrieve any errors that occurred from here + if len(p.errs) > 0 { + return nil, multierr.Combine(p.errs...) + } + + return p.ec2Instances, nil +} + +func (p *fetchRequest) fetchAllPages() { + // Keep fetching pages unless we're stopped OR there are no pages left + for { + select { + case <-p.context.Done(): + p.logger.Debug(logSelector, "done fetching EC2 instances, context cancelled") + return + default: + if !p.fetchNextPage() { + p.logger.Debug(logSelector, "fetched all EC2 instances") + return + } + p.logger.Debug(logSelector, "fetched EC2 instance") + } + } +} + +func (p *fetchRequest) fetchNextPage() (more bool) { + success := p.paginator.Next(p.context) + + if success { + for _, reservation := range p.paginator.CurrentPage().Reservations { + for _, instance := range reservation.Instances { + p.dispatch(func() { p.fetchInstances(instance) }) + } + } + } + + if p.paginator.Err() != nil { + p.recordErrResult(p.paginator.Err()) + } + + return success +} + +// dispatch runs the given func in a new goroutine, properly throttling requests +// with the taskPool and also managing the pendingTasks waitGroup to ensure all +// results are accumulated. +func (p *fetchRequest) dispatch(fn func()) { + p.pendingTasks.Add(1) + + go func() { + slot := p.taskPool.Get() + defer p.taskPool.Put(slot) + defer p.pendingTasks.Done() + + fn() + }() +} + +func (p *fetchRequest) fetchInstances(instance ec2.Instance) { + describeInstancesInput := &ec2.DescribeInstancesInput{InstanceIds: []string{awsauto.SafeStrp(instance.InstanceId)}} + req := p.client.DescribeInstancesRequest(describeInstancesInput) + listen := ec2.NewDescribeInstancesPaginator(req) + + if listen.Err() != nil { + p.recordErrResult(listen.Err()) + } + + for { + select { + case <-p.context.Done(): + return + default: + if !listen.Next(p.context) { + return + } + + for _, reservation := range listen.CurrentPage().Reservations { + for _, instance := range reservation.Instances { + p.recordGoodResult(instance) + } + } + } + + } +} + +func (p *fetchRequest) recordGoodResult(instance ec2.Instance) { + p.resultsLock.Lock() + defer p.resultsLock.Unlock() + + p.ec2Instances = append(p.ec2Instances, &ec2Instance{instance, logp.NewLogger(logSelector)}) +} + +func (p *fetchRequest) recordErrResult(err error) { + p.resultsLock.Lock() + defer p.resultsLock.Unlock() + + p.errs = append(p.errs, err) + + p.cancel() +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go index e4673404274..28174b762f0 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go @@ -1 +1,149 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package ec2 + +import ( + "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" + "github.com/gofrs/uuid" + "github.com/pkg/errors" + + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" + awscommon "github.com/elastic/beats/x-pack/libbeat/common/aws" + + "github.com/elastic/beats/libbeat/autodiscover" + "github.com/elastic/beats/libbeat/autodiscover/template" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" +) + +func init() { + autodiscover.Registry.AddProvider("aws_ec2", AutodiscoverBuilder) +} + +// Provider implements autodiscover provider for aws EC2s. +type Provider struct { + config *awsauto.Config + bus bus.Bus + builders autodiscover.Builders + appenders autodiscover.Appenders + templates *template.Mapper + startListener bus.Listener + stopListener bus.Listener + watcher *watcher + uuid uuid.UUID +} + +// AutodiscoverBuilder is the main builder for this provider. +func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodiscover.Provider, error) { + cfgwarn.Experimental("aws_ec2 autodiscover is experimental") + + config := awsauto.DefaultConfig() + err := c.Unpack(&config) + if err != nil { + return nil, err + } + + var clients []ec2iface.ClientAPI + for _, region := range config.Regions { + awsCfg, err := awscommon.GetAWSCredentials( + awscommon.ConfigAWS{ + AccessKeyID: config.AWSConfig.AccessKeyID, + SecretAccessKey: config.AWSConfig.SecretAccessKey, + SessionToken: config.AWSConfig.SessionToken, + ProfileName: config.AWSConfig.ProfileName, + }) + if err != nil { + logp.Error(errors.Wrap(err, "error loading AWS config for aws_ec2 autodiscover provider")) + } + awsCfg.Region = region + clients = append(clients, ec2.New(awsCfg)) + } + + return internalBuilder(uuid, bus, config, newAPIFetcher(clients)) +} + +// internalBuilder is mainly intended for testing via mocks and stubs. +// it can be configured to use a fetcher that doesn't actually hit the AWS API. +func internalBuilder(uuid uuid.UUID, bus bus.Bus, config *awsauto.Config, fetcher fetcher) (*Provider, error) { + mapper, err := template.NewConfigMapper(config.Templates) + if err != nil { + return nil, err + } + + builders, err := autodiscover.NewBuilders(config.Builders, nil) + if err != nil { + return nil, err + } + + appenders, err := autodiscover.NewAppenders(config.Appenders) + if err != nil { + return nil, err + } + + p := &Provider{ + config: config, + bus: bus, + builders: builders, + appenders: appenders, + templates: &mapper, + uuid: uuid, + } + + p.watcher = newWatcher( + fetcher, + config.Period, + p.onWatcherStart, + p.onWatcherStop, + ) + + return p, nil +} + +// Start the autodiscover process. +func (p *Provider) Start() { + p.watcher.start() +} + +// Stop the autodiscover process. +func (p *Provider) Stop() { + p.watcher.stop() +} + +func (p *Provider) onWatcherStart(arn string, instance *ec2Instance) { + instanceMap := instance.toMap() + e := bus.Event{ + "start": true, + "provider": p.uuid, + "id": arn, + "host": instanceMap["host"], + "port": instanceMap["port"], + "meta": common.MapStr{ + "ec2": instance.toMap(), + "cloud": instance.toCloudMap(), + }, + } + + if configs := p.templates.GetConfig(e); configs != nil { + e["config"] = configs + } + p.appenders.Append(e) + p.bus.Publish(e) +} + +func (p *Provider) onWatcherStop(arn string) { + e := bus.Event{ + "stop": true, + "id": arn, + "provider": p.uuid, + } + p.bus.Publish(e) +} + +func (p *Provider) String() string { + return "aws_ec2" +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go index e4673404274..bd5caddc0af 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go @@ -1 +1,105 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package ec2 + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/logp" +) + +type watcher struct { + // gen tracks changes we increment the 'generation' of each entry in the map. + gen uint64 + fetcher fetcher + onStart func(uuid string, lblMap *ec2Instance) + onStop func(uuid string) + done chan struct{} + ticker *time.Ticker + period time.Duration + ec2Instances map[string]uint64 + logger *logp.Logger +} + +func newWatcher( + fetcher fetcher, + period time.Duration, + onStart func(uuid string, lblMap *ec2Instance), + onStop func(uuid string)) *watcher { + return &watcher{ + fetcher: fetcher, + onStart: onStart, + onStop: onStop, + done: make(chan struct{}), + ticker: time.NewTicker(period), + period: period, + ec2Instances: map[string]uint64{}, + } +} + +func (w *watcher) start() { + go w.forever() +} + +func (w *watcher) stop() { + close(w.done) +} + +func (w *watcher) forever() { + for { + select { + case <-w.done: + w.ticker.Stop() + return + case <-w.ticker.C: + err := w.once() + if err != nil { + logp.Error(errors.Wrap(err, "error while fetching AWS ELBs")) + } + } + } +} + +// once executes the watch loop a single time. +// This is mostly useful for testing. +func (w *watcher) once() error { + ctx, cancelCtx := context.WithTimeout(context.Background(), w.period) + defer cancelCtx() // Always cancel to avoid leak + + fetchedEC2s, err := w.fetcher.fetch(ctx) + if err != nil { + return err + } + w.logger.Debug("autodiscover-ec2", "fetched %d ec2 instances from AWS for autodiscovery", len(fetchedEC2s)) + + oldGen := w.gen + w.gen++ + + // Increment the generation of all EC2s returned by the API request + for _, instance := range fetchedEC2s { + arn := instance.arn() + if _, exists := w.ec2Instances[arn]; !exists { + if w.onStart != nil { + w.onStart(arn, instance) + } + } + w.ec2Instances[arn] = w.gen + } + + // EC2s not seen in the API request get deleted + for uuid, entryGen := range w.ec2Instances { + if entryGen == oldGen { + if w.onStop != nil { + w.onStop(uuid) + delete(w.ec2Instances, uuid) + } + } + } + + return nil +} diff --git a/x-pack/libbeat/cmd/inject.go b/x-pack/libbeat/cmd/inject.go index fa5ccea9edb..4bd3e9b294e 100644 --- a/x-pack/libbeat/cmd/inject.go +++ b/x-pack/libbeat/cmd/inject.go @@ -13,6 +13,7 @@ import ( _ "github.com/elastic/beats/x-pack/libbeat/management" // register autodiscover providers + _ "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws/ec2" _ "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws/elb" ) From dfc62695420387b1b8581f590463e17d7a7406d6 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 27 Nov 2019 15:49:11 -0700 Subject: [PATCH 03/22] test with logp.Debug --- x-pack/libbeat/autodiscover/providers/aws/config.go | 1 - x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go | 12 +++++++----- .../libbeat/autodiscover/providers/aws/ec2/fetch.go | 9 ++++----- .../autodiscover/providers/aws/ec2/provider.go | 4 +--- .../libbeat/autodiscover/providers/aws/ec2/watch.go | 4 ++-- 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/x-pack/libbeat/autodiscover/providers/aws/config.go b/x-pack/libbeat/autodiscover/providers/aws/config.go index ce712c821b9..aef7bfd2f90 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/config.go +++ b/x-pack/libbeat/autodiscover/providers/aws/config.go @@ -29,7 +29,6 @@ type Config struct { Period time.Duration `config:"period" validate:"nonzero,required"` // AWS Specific autodiscover fields - Regions []string `config:"regions" validate:"required"` AWSConfig aws.ConfigAWS `config:",inline"` } diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go index 2e9692dbce0..7097574450a 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go @@ -5,7 +5,9 @@ package ec2 import ( + "fmt" "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -14,24 +16,24 @@ import ( type ec2Instance struct { ec2Instance ec2.Instance - logger *logp.Logger } // toMap converts this ec2Instance into the form consumed as metadata in the autodiscovery process. func (i *ec2Instance) toMap() common.MapStr { instanceType, err := i.ec2Instance.InstanceType.MarshalValue() + fmt.Println("instance type = ", instanceType) if err != nil { - i.logger.Error("MarshalValue failed for instance type: ", err) + logp.Error(errors.Wrap(err, "MarshalValue failed for instance type: ")) } monitoringState, err := i.ec2Instance.Monitoring.State.MarshalValue() if err != nil { - i.logger.Error("MarshalValue failed for monitoring state: ", err) + logp.Error(errors.Wrap(err, "MarshalValue failed for monitoring state: ")) } architecture, err := i.ec2Instance.Architecture.MarshalValue() if err != nil { - i.logger.Error("MarshalValue failed for architecture: ", err) + logp.Error(errors.Wrap(err, "MarshalValue failed for architecture: ")) } m := common.MapStr{ @@ -80,7 +82,7 @@ func (i *ec2Instance) stateMap() (stateMap common.MapStr) { stateMap = common.MapStr{} nameString, err := state.Name.MarshalValue() if err != nil { - i.logger.Error("MarshalValue failed for instance state name: ", err) + logp.Error(errors.Wrap(err,"MarshalValue failed for instance state name: ")) } stateMap["name"] = nameString diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go index 60ae04fcee2..1b6f0c50b0a 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go @@ -113,7 +113,6 @@ type fetchRequest struct { pendingTasks sync.WaitGroup context context.Context cancel func() - logger *logp.Logger } func (p *fetchRequest) fetch() ([]*ec2Instance, error) { @@ -140,14 +139,14 @@ func (p *fetchRequest) fetchAllPages() { for { select { case <-p.context.Done(): - p.logger.Debug(logSelector, "done fetching EC2 instances, context cancelled") + logp.Debug(logSelector, "done fetching EC2 instances, context cancelled") return default: if !p.fetchNextPage() { - p.logger.Debug(logSelector, "fetched all EC2 instances") + logp.Debug(logSelector, "fetched all EC2 instances") return } - p.logger.Debug(logSelector, "fetched EC2 instance") + logp.Debug(logSelector, "fetched EC2 instance") } } } @@ -217,7 +216,7 @@ func (p *fetchRequest) recordGoodResult(instance ec2.Instance) { p.resultsLock.Lock() defer p.resultsLock.Unlock() - p.ec2Instances = append(p.ec2Instances, &ec2Instance{instance, logp.NewLogger(logSelector)}) + p.ec2Instances = append(p.ec2Instances, &ec2Instance{instance}) } func (p *fetchRequest) recordErrResult(err error) { diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go index 28174b762f0..a0f8b77f50b 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go @@ -120,10 +120,8 @@ func (p *Provider) onWatcherStart(arn string, instance *ec2Instance) { "start": true, "provider": p.uuid, "id": arn, - "host": instanceMap["host"], - "port": instanceMap["port"], "meta": common.MapStr{ - "ec2": instance.toMap(), + "ec2": instanceMap, "cloud": instance.toCloudMap(), }, } diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go index bd5caddc0af..685780432d3 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go @@ -29,7 +29,7 @@ type watcher struct { func newWatcher( fetcher fetcher, period time.Duration, - onStart func(uuid string, lblMap *ec2Instance), + onStart func(uuid string, instanceMap *ec2Instance), onStop func(uuid string)) *watcher { return &watcher{ fetcher: fetcher, @@ -59,7 +59,7 @@ func (w *watcher) forever() { case <-w.ticker.C: err := w.once() if err != nil { - logp.Error(errors.Wrap(err, "error while fetching AWS ELBs")) + logp.Error(errors.Wrap(err, "error while fetching AWS EC2s")) } } } From e5899f87e695a915b3c061e8f35b15dca25c452d Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 6 Dec 2019 14:15:38 -0700 Subject: [PATCH 04/22] Add comment to DefaultConfig function --- x-pack/libbeat/autodiscover/providers/aws/config.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/libbeat/autodiscover/providers/aws/config.go b/x-pack/libbeat/autodiscover/providers/aws/config.go index aef7bfd2f90..9641cb7695c 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/config.go +++ b/x-pack/libbeat/autodiscover/providers/aws/config.go @@ -13,7 +13,7 @@ import ( "github.com/elastic/beats/libbeat/common" ) -// Config for the aws_ec2 autodiscover provider. +// Config for all aws autodiscover providers. type Config struct { Type string `config:"type"` @@ -33,6 +33,7 @@ type Config struct { AWSConfig aws.ConfigAWS `config:",inline"` } +// DefaultConfig for all aws autodiscover providers. func DefaultConfig() *Config { return &Config{ Period: time.Minute, From c042a08d749f98ff02a7e02ea981f58e3561e3c5 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 17 Dec 2019 15:51:41 -0700 Subject: [PATCH 05/22] Change ec2 metadata mapping --- .../autodiscover/providers/aws/ec2/ec2.go | 49 +++---------------- .../autodiscover/providers/aws/ec2/fetch.go | 3 +- .../autodiscover/providers/aws/ec2/watch.go | 12 +++-- 3 files changed, 16 insertions(+), 48 deletions(-) diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go index 7097574450a..2d8e2534605 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go @@ -5,7 +5,6 @@ package ec2 import ( - "fmt" "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/pkg/errors" @@ -21,7 +20,6 @@ type ec2Instance struct { // toMap converts this ec2Instance into the form consumed as metadata in the autodiscovery process. func (i *ec2Instance) toMap() common.MapStr { instanceType, err := i.ec2Instance.InstanceType.MarshalValue() - fmt.Println("instance type = ", instanceType) if err != nil { logp.Error(errors.Wrap(err, "MarshalValue failed for instance type: ")) } @@ -37,13 +35,10 @@ func (i *ec2Instance) toMap() common.MapStr { } m := common.MapStr{ + "instance_id": awsauto.SafeStrp(i.ec2Instance.InstanceId), "image_id": awsauto.SafeStrp(i.ec2Instance.ImageId), "vpc_id": awsauto.SafeStrp(i.ec2Instance.VpcId), "subnet_id": awsauto.SafeStrp(i.ec2Instance.SubnetId), - "host_id": awsauto.SafeStrp(i.ec2Instance.Placement.HostId), - "group_name": awsauto.SafeStrp(i.ec2Instance.Placement.GroupName), - "arn": awsauto.SafeStrp(i.ec2Instance.IamInstanceProfile.Arn), - "instance_id": awsauto.SafeStrp(i.ec2Instance.IamInstanceProfile.Id), "type": instanceType, "private_ip": awsauto.SafeStrp(i.ec2Instance.PrivateIpAddress), "private_dns_name": awsauto.SafeStrp(i.ec2Instance.PrivateDnsName), @@ -54,8 +49,10 @@ func (i *ec2Instance) toMap() common.MapStr { "root_device_name": awsauto.SafeStrp(i.ec2Instance.RootDeviceName), "kernel_id": awsauto.SafeStrp(i.ec2Instance.KernelId), "state": i.stateMap(), - "state_reason": i.stateReasonMap(), - "tags": i.tagMap(), + } + + for _, tag := range i.ec2Instance.Tags { + m.Put("tags."+awsauto.SafeStrp(tag.Key), awsauto.SafeStrp(tag.Value)) } return m } @@ -67,52 +64,20 @@ func (i *ec2Instance) toCloudMap() common.MapStr { m["provider"] = "aws" // The region is just an AZ with the last character removed - m["region"] = availabilityZone[:len(availabilityZone)-2] + m["region"] = availabilityZone[:len(availabilityZone)-1] return m } -// arn returns a globally unique ID. In the case of an ec2Instance, that would be its listenerArn. -func (i *ec2Instance) arn() string { - return awsauto.SafeStrp(i.ec2Instance.IamInstanceProfile.Arn) -} - // stateMap converts the State part of the ec2 struct into a friendlier map with 'reason' and 'code' fields. func (i *ec2Instance) stateMap() (stateMap common.MapStr) { state := i.ec2Instance.State stateMap = common.MapStr{} nameString, err := state.Name.MarshalValue() if err != nil { - logp.Error(errors.Wrap(err,"MarshalValue failed for instance state name: ")) + logp.Error(errors.Wrap(err, "MarshalValue failed for instance state name: ")) } stateMap["name"] = nameString stateMap["code"] = state.Code - - stateReason := i.ec2Instance.StateReason - stateMap["state_reason"] = awsauto.SafeStrp(stateReason.Code) - stateMap[""] = awsauto.SafeStrp(stateReason.Message) return stateMap } - -// stateReasonMap converts the State Reason part of the ec2 struct into a friendlier map with 'reason' and 'code' fields. -func (i *ec2Instance) stateReasonMap() (stateReasonMap common.MapStr) { - stateReasonMap = common.MapStr{} - stateReason := i.ec2Instance.StateReason - stateReasonMap["code"] = awsauto.SafeStrp(stateReason.Code) - stateReasonMap["message"] = awsauto.SafeStrp(stateReason.Message) - return stateReasonMap -} - -// stateMap converts the State part of the ec2 struct into a friendlier map with 'reason' and 'code' fields. -func (i *ec2Instance) tagMap() (tagsMap []common.MapStr) { - tags := i.ec2Instance.Tags - tagsMap = []common.MapStr{} - tagPair := common.MapStr{} - for _, tag := range tags { - tagPair["key"] = awsauto.SafeStrp(tag.Key) - tagPair["value"] = awsauto.SafeStrp(tag.Value) - tagsMap = append(tagsMap, tagPair) - } - - return tagsMap -} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go index 1b6f0c50b0a..5864fc9eaec 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go @@ -11,8 +11,9 @@ import ( "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" "go.uber.org/multierr" - awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" + "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" ) const logSelector = "autodiscover-ec2-fetch" diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go index 685780432d3..bbb10a86f8a 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" ) type watcher struct { @@ -39,6 +40,7 @@ func newWatcher( ticker: time.NewTicker(period), period: period, ec2Instances: map[string]uint64{}, + logger: logp.NewLogger("autodiscover-ec2-watcher"), } } @@ -75,20 +77,20 @@ func (w *watcher) once() error { if err != nil { return err } - w.logger.Debug("autodiscover-ec2", "fetched %d ec2 instances from AWS for autodiscovery", len(fetchedEC2s)) + w.logger.Debugf("autodiscover-ec2", "fetched %d ec2 instances from AWS for autodiscovery", len(fetchedEC2s)) oldGen := w.gen w.gen++ // Increment the generation of all EC2s returned by the API request for _, instance := range fetchedEC2s { - arn := instance.arn() - if _, exists := w.ec2Instances[arn]; !exists { + instanceId := awsauto.SafeStrp(instance.ec2Instance.InstanceId) + if _, exists := w.ec2Instances[instanceId]; !exists { if w.onStart != nil { - w.onStart(arn, instance) + w.onStart(instanceId, instance) } } - w.ec2Instances[arn] = w.gen + w.ec2Instances[instanceId] = w.gen } // EC2s not seen in the API request get deleted From 5662544dc7bfd5a6e3ee67bf9997f09ab85f295b Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 17 Dec 2019 16:00:41 -0700 Subject: [PATCH 06/22] Update changelog --- x-pack/metricbeat/metricbeat.yml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/x-pack/metricbeat/metricbeat.yml b/x-pack/metricbeat/metricbeat.yml index 5bd19f3030c..97e70192327 100644 --- a/x-pack/metricbeat/metricbeat.yml +++ b/x-pack/metricbeat/metricbeat.yml @@ -163,3 +163,28 @@ processors: # This allows to enable 6.7 migration aliases #migration.6_to_7.enabled: true + +#============================== Autodiscover =================================== + +# Autodiscover allows you to detect changes in the system and spawn new modules +# as they happen. +logging.level: debug +metricbeat.autodiscover: +# List of enabled autodiscover providers + providers: + - type: aws_ec2 + period: 1m + regions: + - us-east-1 + credential_profile_name: elastic-beats + templates: + - condition: + equals: + meta.ec2.tags.created-by: "ks" + config: + - module: mysql + metricsets: ["status", "galera_status"] + period: 10s + hosts: ["root:password@tcp(${data.meta.ec2.public_ip}:3306)/"] + username: root + password: password From a4e7f4f0017132df880b2ed0fb6f8d476806e03a Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 17 Dec 2019 16:01:43 -0700 Subject: [PATCH 07/22] Update changelog --- CHANGELOG.next.asciidoc | 1 + x-pack/metricbeat/metricbeat.yml | 25 ------------------------- 2 files changed, 1 insertion(+), 25 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c17f0c442d8..c2b848e64b8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -372,6 +372,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Refactor kubernetes autodiscover to enable different resource based discovery {pull}14738[14738] - Add `add_id` processor. {pull}14524[14524] - Enable TLS 1.3 in all beats. {pull}12973[12973] +- Add `aws_ec2` provider for autodiscover. {issue}12518[12518] {pull}14823[14823] *Auditbeat* diff --git a/x-pack/metricbeat/metricbeat.yml b/x-pack/metricbeat/metricbeat.yml index 97e70192327..5bd19f3030c 100644 --- a/x-pack/metricbeat/metricbeat.yml +++ b/x-pack/metricbeat/metricbeat.yml @@ -163,28 +163,3 @@ processors: # This allows to enable 6.7 migration aliases #migration.6_to_7.enabled: true - -#============================== Autodiscover =================================== - -# Autodiscover allows you to detect changes in the system and spawn new modules -# as they happen. -logging.level: debug -metricbeat.autodiscover: -# List of enabled autodiscover providers - providers: - - type: aws_ec2 - period: 1m - regions: - - us-east-1 - credential_profile_name: elastic-beats - templates: - - condition: - equals: - meta.ec2.tags.created-by: "ks" - config: - - module: mysql - metricsets: ["status", "galera_status"] - period: 10s - hosts: ["root:password@tcp(${data.meta.ec2.public_ip}:3306)/"] - username: root - password: password From 580c13aa622144db2497f0d256bc41dfef7cf88c Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 17 Dec 2019 16:05:01 -0700 Subject: [PATCH 08/22] change instanceId to instanceID --- x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go index bbb10a86f8a..946b02cbfa0 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go @@ -84,13 +84,13 @@ func (w *watcher) once() error { // Increment the generation of all EC2s returned by the API request for _, instance := range fetchedEC2s { - instanceId := awsauto.SafeStrp(instance.ec2Instance.InstanceId) - if _, exists := w.ec2Instances[instanceId]; !exists { + instanceID := awsauto.SafeStrp(instance.ec2Instance.InstanceId) + if _, exists := w.ec2Instances[instanceID]; !exists { if w.onStart != nil { - w.onStart(instanceId, instance) + w.onStart(instanceID, instance) } } - w.ec2Instances[instanceId] = w.gen + w.ec2Instances[instanceID] = w.gen } // EC2s not seen in the API request get deleted From f06521dcbaf849396d72b3ad0f3f3b47c4ca38e9 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 18 Dec 2019 15:30:43 -0700 Subject: [PATCH 09/22] Collect from all regions if regions is not specified --- .../libbeat/autodiscover/providers/aws/aws.go | 8 +- .../autodiscover/providers/aws/config.go | 2 +- .../autodiscover/providers/aws/ec2/ec2.go | 97 ++++++++++++++----- .../autodiscover/providers/aws/ec2/fetch.go | 4 +- .../providers/aws/ec2/fetch_test.go | 18 ++++ .../providers/aws/ec2/mock_ec2_client_test.go | 18 ++++ .../providers/aws/ec2/provider.go | 71 ++++++++------ .../autodiscover/providers/aws/ec2/watch.go | 2 +- .../autodiscover/providers/aws/elb/config.go | 41 -------- .../providers/aws/elb/lblistener.go | 19 +--- .../providers/aws/elb/provider.go | 10 +- .../providers/aws/elb/provider_test.go | 3 +- 12 files changed, 168 insertions(+), 125 deletions(-) create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/fetch_test.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/mock_ec2_client_test.go delete mode 100644 x-pack/libbeat/autodiscover/providers/aws/elb/config.go diff --git a/x-pack/libbeat/autodiscover/providers/aws/aws.go b/x-pack/libbeat/autodiscover/providers/aws/aws.go index 240ecb988ae..8647e574425 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/aws.go +++ b/x-pack/libbeat/autodiscover/providers/aws/aws.go @@ -4,14 +4,14 @@ package aws -// SafeStrp makes handling AWS *string types easier. +// SafeString makes handling AWS *string types easier. // The AWS lib never returns plain strings, always using pointers, probably for memory efficiency reasons. // This is a bit odd, because strings are just pointers into byte arrays, however this is the choice they've made. // This will return the plain version of the given string or an empty string if the pointer is null -func SafeStrp(strp *string) string { - if strp == nil { +func SafeString(str *string) string { + if str == nil { return "" } - return *strp + return *str } diff --git a/x-pack/libbeat/autodiscover/providers/aws/config.go b/x-pack/libbeat/autodiscover/providers/aws/config.go index 9641cb7695c..d0e20f33f92 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/config.go +++ b/x-pack/libbeat/autodiscover/providers/aws/config.go @@ -29,7 +29,7 @@ type Config struct { Period time.Duration `config:"period" validate:"nonzero,required"` // AWS Specific autodiscover fields - Regions []string `config:"regions" validate:"required"` + Regions []string `config:"regions"` AWSConfig aws.ConfigAWS `config:",inline"` } diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go index 2d8e2534605..2190723c6e6 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go @@ -19,52 +19,99 @@ type ec2Instance struct { // toMap converts this ec2Instance into the form consumed as metadata in the autodiscovery process. func (i *ec2Instance) toMap() common.MapStr { - instanceType, err := i.ec2Instance.InstanceType.MarshalValue() - if err != nil { - logp.Error(errors.Wrap(err, "MarshalValue failed for instance type: ")) - } - - monitoringState, err := i.ec2Instance.Monitoring.State.MarshalValue() - if err != nil { - logp.Error(errors.Wrap(err, "MarshalValue failed for monitoring state: ")) - } - architecture, err := i.ec2Instance.Architecture.MarshalValue() if err != nil { logp.Error(errors.Wrap(err, "MarshalValue failed for architecture: ")) } m := common.MapStr{ - "instance_id": awsauto.SafeStrp(i.ec2Instance.InstanceId), - "image_id": awsauto.SafeStrp(i.ec2Instance.ImageId), - "vpc_id": awsauto.SafeStrp(i.ec2Instance.VpcId), - "subnet_id": awsauto.SafeStrp(i.ec2Instance.SubnetId), - "type": instanceType, - "private_ip": awsauto.SafeStrp(i.ec2Instance.PrivateIpAddress), - "private_dns_name": awsauto.SafeStrp(i.ec2Instance.PrivateDnsName), - "public_ip": awsauto.SafeStrp(i.ec2Instance.PublicIpAddress), - "public_dns_name": awsauto.SafeStrp(i.ec2Instance.PublicDnsName), - "monitoring_state": monitoringState, - "architecture": architecture, - "root_device_name": awsauto.SafeStrp(i.ec2Instance.RootDeviceName), - "kernel_id": awsauto.SafeStrp(i.ec2Instance.KernelId), + "image": i.toImage(), + "vpc": i.toVpc(), + "subnet": i.toSubnet(), + "private": i.toPrivate(), + "public": i.toPublic(), + "monitoring": i.toMonitoringState(), + "kernel": i.toKernel(), "state": i.stateMap(), + "architecture": architecture, + "root_device_name": awsauto.SafeString(i.ec2Instance.RootDeviceName), } for _, tag := range i.ec2Instance.Tags { - m.Put("tags."+awsauto.SafeStrp(tag.Key), awsauto.SafeStrp(tag.Value)) + m.Put("tags."+awsauto.SafeString(tag.Key), awsauto.SafeString(tag.Value)) + } + return m +} + +func (i *ec2Instance) toImage() common.MapStr { + m := common.MapStr{} + m["id"] = awsauto.SafeString(i.ec2Instance.ImageId) + return m +} + +func (i *ec2Instance) toMonitoringState() common.MapStr { + monitoringState, err := i.ec2Instance.Monitoring.State.MarshalValue() + if err != nil { + logp.Error(errors.Wrap(err, "MarshalValue failed for monitoring state: ")) } + + m := common.MapStr{} + m["state"] = monitoringState + return m +} + +func (i *ec2Instance) toPrivate() common.MapStr { + m := common.MapStr{} + m["ip"] = awsauto.SafeString(i.ec2Instance.PrivateIpAddress) + m["dns_name"] = awsauto.SafeString(i.ec2Instance.PrivateDnsName) + return m +} + +func (i *ec2Instance) toPublic() common.MapStr { + m := common.MapStr{} + m["ip"] = awsauto.SafeString(i.ec2Instance.PublicIpAddress) + m["dns_name"] = awsauto.SafeString(i.ec2Instance.PublicDnsName) + return m +} + +func (i *ec2Instance) toVpc() common.MapStr { + m := common.MapStr{} + m["id"] = awsauto.SafeString(i.ec2Instance.VpcId) + return m +} + +func (i *ec2Instance) toSubnet() common.MapStr { + m := common.MapStr{} + m["id"] = awsauto.SafeString(i.ec2Instance.SubnetId) + return m +} + +func (i *ec2Instance) toKernel() common.MapStr { + m := common.MapStr{} + m["id"] = awsauto.SafeString(i.ec2Instance.KernelId) return m } func (i *ec2Instance) toCloudMap() common.MapStr { m := common.MapStr{} - availabilityZone := awsauto.SafeStrp(i.ec2Instance.Placement.AvailabilityZone) + availabilityZone := awsauto.SafeString(i.ec2Instance.Placement.AvailabilityZone) m["availability_zone"] = availabilityZone m["provider"] = "aws" // The region is just an AZ with the last character removed m["region"] = availabilityZone[:len(availabilityZone)-1] + + instance := common.MapStr{} + instance["id"] = awsauto.SafeString(i.ec2Instance.InstanceId) + m["instance"] = instance + + instanceType, err := i.ec2Instance.InstanceType.MarshalValue() + if err != nil { + logp.Error(errors.Wrap(err, "MarshalValue failed for instance type: ")) + } + machine := common.MapStr{} + machine["type"] = instanceType + m["machine"] = machine return m } diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go index 5864fc9eaec..a5f68c26060 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go @@ -74,7 +74,7 @@ func newAPIFetcher(clients []ec2iface.ClientAPI) fetcher { } // fetch attempts to request the full list of ec2Instance objects. -// It accomplishes this by fetching a page of load balancers, then one go routine +// It accomplishes this by fetching a page of EC2 instances, then one go routine // per listener API request. Each page of results has O(n)+1 perf since we need that // additional fetch per EC2. We let the goroutine scheduler sort things out, and use // a sync.Pool to limit the number of in-flight requests. @@ -186,7 +186,7 @@ func (p *fetchRequest) dispatch(fn func()) { } func (p *fetchRequest) fetchInstances(instance ec2.Instance) { - describeInstancesInput := &ec2.DescribeInstancesInput{InstanceIds: []string{awsauto.SafeStrp(instance.InstanceId)}} + describeInstancesInput := &ec2.DescribeInstancesInput{InstanceIds: []string{awsauto.SafeString(instance.InstanceId)}} req := p.client.DescribeInstancesRequest(describeInstancesInput) listen := ec2.NewDescribeInstancesPaginator(req) diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch_test.go new file mode 100644 index 00000000000..084322e06e8 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch_test.go @@ -0,0 +1,18 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "testing" + + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" + "github.com/stretchr/testify/require" +) + +func Test_newAPIFetcher(t *testing.T) { + client := newMockEC2Client(0) + fetcher := newAPIFetcher([]ec2iface.ClientAPI{client}) + require.NotNil(t, fetcher) +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/mock_ec2_client_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/mock_ec2_client_test.go new file mode 100644 index 00000000000..66c9cf6c6c2 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/mock_ec2_client_test.go @@ -0,0 +1,18 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" +) + +func newMockEC2Client(numResults int) mockEC2Client { + return mockEC2Client{numResults: numResults} +} + +type mockEC2Client struct { + ec2iface.ClientAPI + numResults int +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go index a0f8b77f50b..58bead6695b 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go @@ -5,20 +5,19 @@ package ec2 import ( + "context" "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" - "github.com/gofrs/uuid" - "github.com/pkg/errors" - - awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" - awscommon "github.com/elastic/beats/x-pack/libbeat/common/aws" - "github.com/elastic/beats/libbeat/autodiscover" "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" + awscommon "github.com/elastic/beats/x-pack/libbeat/common/aws" + "github.com/gofrs/uuid" + "github.com/pkg/errors" ) func init() { @@ -29,8 +28,6 @@ func init() { type Provider struct { config *awsauto.Config bus bus.Bus - builders autodiscover.Builders - appenders autodiscover.Appenders templates *template.Mapper startListener bus.Listener stopListener bus.Listener @@ -48,15 +45,29 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis return nil, err } + awsCfg, err := awscommon.GetAWSCredentials( + awscommon.ConfigAWS{ + AccessKeyID: config.AWSConfig.AccessKeyID, + SecretAccessKey: config.AWSConfig.SecretAccessKey, + SessionToken: config.AWSConfig.SessionToken, + ProfileName: config.AWSConfig.ProfileName, + }) + + // Construct MetricSet with a full regions list if there is no region specified. + if config.Regions == nil { + // set default region to make initial aws api call + awsCfg.Region = "us-west-1" + svcEC2 := ec2.New(awsCfg) + completeRegionsList, err := getRegions(svcEC2) + if err != nil { + return nil, err + } + + config.Regions = completeRegionsList + } + var clients []ec2iface.ClientAPI for _, region := range config.Regions { - awsCfg, err := awscommon.GetAWSCredentials( - awscommon.ConfigAWS{ - AccessKeyID: config.AWSConfig.AccessKeyID, - SecretAccessKey: config.AWSConfig.SecretAccessKey, - SessionToken: config.AWSConfig.SessionToken, - ProfileName: config.AWSConfig.ProfileName, - }) if err != nil { logp.Error(errors.Wrap(err, "error loading AWS config for aws_ec2 autodiscover provider")) } @@ -75,21 +86,9 @@ func internalBuilder(uuid uuid.UUID, bus bus.Bus, config *awsauto.Config, fetche return nil, err } - builders, err := autodiscover.NewBuilders(config.Builders, nil) - if err != nil { - return nil, err - } - - appenders, err := autodiscover.NewAppenders(config.Appenders) - if err != nil { - return nil, err - } - p := &Provider{ config: config, bus: bus, - builders: builders, - appenders: appenders, templates: &mapper, uuid: uuid, } @@ -115,13 +114,12 @@ func (p *Provider) Stop() { } func (p *Provider) onWatcherStart(arn string, instance *ec2Instance) { - instanceMap := instance.toMap() e := bus.Event{ "start": true, "provider": p.uuid, "id": arn, "meta": common.MapStr{ - "ec2": instanceMap, + "ec2": instance.toMap(), "cloud": instance.toCloudMap(), }, } @@ -129,7 +127,6 @@ func (p *Provider) onWatcherStart(arn string, instance *ec2Instance) { if configs := p.templates.GetConfig(e); configs != nil { e["config"] = configs } - p.appenders.Append(e) p.bus.Publish(e) } @@ -145,3 +142,17 @@ func (p *Provider) onWatcherStop(arn string) { func (p *Provider) String() string { return "aws_ec2" } + +func getRegions(svc ec2iface.ClientAPI) (completeRegionsList []string, err error) { + input := &ec2.DescribeRegionsInput{} + req := svc.DescribeRegionsRequest(input) + output, err := req.Send(context.TODO()) + if err != nil { + err = errors.Wrap(err, "Failed DescribeRegions") + return + } + for _, region := range output.Regions { + completeRegionsList = append(completeRegionsList, *region.RegionName) + } + return +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go index 946b02cbfa0..223d130ec29 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go @@ -84,7 +84,7 @@ func (w *watcher) once() error { // Increment the generation of all EC2s returned by the API request for _, instance := range fetchedEC2s { - instanceID := awsauto.SafeStrp(instance.ec2Instance.InstanceId) + instanceID := awsauto.SafeString(instance.ec2Instance.InstanceId) if _, exists := w.ec2Instances[instanceID]; !exists { if w.onStart != nil { w.onStart(instanceID, instance) diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/config.go b/x-pack/libbeat/autodiscover/providers/aws/elb/config.go deleted file mode 100644 index 0eb8fb73c0c..00000000000 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/config.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package elb - -import ( - "time" - - "github.com/elastic/beats/x-pack/libbeat/common/aws" - - "github.com/elastic/beats/libbeat/autodiscover/template" - "github.com/elastic/beats/libbeat/common" -) - -// Config for the aws_elb autodiscover provider. -type Config struct { - Type string `config:"type"` - - // Standard autodiscover fields. - - // Hints are currently not supported, but may be implemented in a later release - HintsEnabled bool `config:"hints.enabled"` - Builders []*common.Config `config:"builders"` - Appenders []*common.Config `config:"appenders"` - Templates template.MapperSettings `config:"templates"` - - // Period defines how often to poll the AWS API. - Period time.Duration `config:"period" validate:"nonzero,required"` - - // AWS Specific autodiscover fields - - Regions []string `config:"regions" validate:"required"` - AWSConfig aws.ConfigAWS `config:",inline"` -} - -func defaultConfig() *Config { - return &Config{ - Period: time.Minute, - } -} diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/lblistener.go b/x-pack/libbeat/autodiscover/providers/aws/elb/lblistener.go index 94e49867781..b673113872d 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/lblistener.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/lblistener.go @@ -8,6 +8,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" "github.com/elastic/beats/libbeat/common" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" ) // lbListener is a tuple type representing an elasticloadbalancingv2.Listener and its associated elasticloadbalancingv2.LoadBalancer. @@ -21,8 +22,8 @@ func (l *lbListener) toMap() common.MapStr { // We fully spell out listener_arn to avoid confusion with the ARN for the whole ELB m := common.MapStr{ "listener_arn": l.listener.ListenerArn, - "load_balancer_arn": safeStrp(l.lb.LoadBalancerArn), - "host": safeStrp(l.lb.DNSName), + "load_balancer_arn": awsauto.SafeString(l.lb.LoadBalancerArn), + "host": awsauto.SafeString(l.lb.DNSName), "protocol": l.listener.Protocol, "type": string(l.lb.Type), "scheme": l.lb.Scheme, @@ -31,7 +32,7 @@ func (l *lbListener) toMap() common.MapStr { "state": l.stateMap(), "ip_address_type": string(l.lb.IpAddressType), "security_groups": l.lb.SecurityGroups, - "vpc_id": safeStrp(l.lb.VpcId), + "vpc_id": awsauto.SafeString(l.lb.VpcId), "ssl_policy": l.listener.SslPolicy, } @@ -42,18 +43,6 @@ func (l *lbListener) toMap() common.MapStr { return m } -// safeStrp makes handling AWS *string types easier. -// The AWS lib never returns plain strings, always using pointers, probably for memory efficiency reasons. -// This is a bit odd, because strings are just pointers into byte arrays, however this is the choice they've made. -// This will return the plain version of the given string or an empty string if the pointer is null -func safeStrp(strp *string) string { - if strp == nil { - return "" - } - - return *strp -} - func (l *lbListener) toCloudMap() common.MapStr { m := common.MapStr{} diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go b/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go index bad7fae4b15..60fce5cbc52 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go @@ -7,8 +7,6 @@ package elb import ( "context" - awscommon "github.com/elastic/beats/x-pack/libbeat/common/aws" - "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/elasticloadbalancingv2iface" "github.com/gofrs/uuid" @@ -19,6 +17,8 @@ import ( "github.com/elastic/beats/libbeat/common/bus" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" + awscommon "github.com/elastic/beats/x-pack/libbeat/common/aws" ) func init() { @@ -27,7 +27,7 @@ func init() { // Provider implements autodiscover provider for aws ELBs. type Provider struct { - config *Config + config *awsauto.Config bus bus.Bus builders autodiscover.Builders appenders autodiscover.Appenders @@ -42,7 +42,7 @@ type Provider struct { func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodiscover.Provider, error) { cfgwarn.Experimental("aws_elb autodiscover is experimental") - config := defaultConfig() + config := awsauto.DefaultConfig() err := c.Unpack(&config) if err != nil { return nil, err @@ -68,7 +68,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis // internalBuilder is mainly intended for testing via mocks and stubs. // it can be configured to use a fetcher that doesn't actually hit the AWS API. -func internalBuilder(uuid uuid.UUID, bus bus.Bus, config *Config, fetcher fetcher) (*Provider, error) { +func internalBuilder(uuid uuid.UUID, bus bus.Bus, config *awsauto.Config, fetcher fetcher) (*Provider, error) { mapper, err := template.NewConfigMapper(config.Templates) if err != nil { return nil, err diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go b/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go index 894d1188c63..086d6a3ab15 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" ) type testEventAccumulator struct { @@ -65,7 +66,7 @@ func Test_internalBuilder(t *testing.T) { fetcher := newMockFetcher(lbls, nil) pBus := bus.New("test") - cfg := &Config{ + cfg := &awsauto.Config{ Regions: []string{"us-east-1a", "us-west-1b"}, Period: time.Nanosecond, } From 3a5fa25382bcddbdceaa2ffe73501c6bf6bd6c28 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 18 Dec 2019 16:49:15 -0700 Subject: [PATCH 10/22] Add GetRegions into aws_elb provider --- .../libbeat/autodiscover/providers/aws/aws.go | 23 +++++++++++++++++ .../autodiscover/providers/aws/ec2/fetch.go | 10 ++++---- .../providers/aws/ec2/provider.go | 22 +++------------- .../autodiscover/providers/aws/elb/fetch.go | 12 ++++----- .../providers/aws/elb/fetch_test.go | 3 +-- .../providers/aws/elb/provider.go | 25 ++++++++++++++++--- 6 files changed, 61 insertions(+), 34 deletions(-) diff --git a/x-pack/libbeat/autodiscover/providers/aws/aws.go b/x-pack/libbeat/autodiscover/providers/aws/aws.go index 8647e574425..413866acc9e 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/aws.go +++ b/x-pack/libbeat/autodiscover/providers/aws/aws.go @@ -4,6 +4,14 @@ package aws +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" + "github.com/pkg/errors" +) + // SafeString makes handling AWS *string types easier. // The AWS lib never returns plain strings, always using pointers, probably for memory efficiency reasons. // This is a bit odd, because strings are just pointers into byte arrays, however this is the choice they've made. @@ -15,3 +23,18 @@ func SafeString(str *string) string { return *str } + +// GetRegions makes DescribeRegions API call to list all regions from AWS +func GetRegions(svc ec2iface.ClientAPI) (completeRegionsList []string, err error) { + input := &ec2.DescribeRegionsInput{} + req := svc.DescribeRegionsRequest(input) + output, err := req.Send(context.TODO()) + if err != nil { + err = errors.Wrap(err, "Failed DescribeRegions") + return + } + for _, region := range output.Regions { + completeRegionsList = append(completeRegionsList, *region.RegionName) + } + return +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go index a5f68c26060..6f1e6fd38fb 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go @@ -16,8 +16,6 @@ import ( awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" ) -const logSelector = "autodiscover-ec2-fetch" - // fetcher is an interface that can fetch a list of ec2Instance objects without pagination being necessary. type fetcher interface { fetch(ctx context.Context) ([]*ec2Instance, error) @@ -91,6 +89,7 @@ func (f *apiFetcher) fetch(ctx context.Context) ([]*ec2Instance, error) { taskPool: sync.Pool{}, context: ctx, cancel: cancel, + logger: logp.NewLogger("autodiscover-ec2-fetch"), } // Limit concurrency against the AWS API by creating a pool of objects @@ -114,6 +113,7 @@ type fetchRequest struct { pendingTasks sync.WaitGroup context context.Context cancel func() + logger *logp.Logger } func (p *fetchRequest) fetch() ([]*ec2Instance, error) { @@ -140,14 +140,14 @@ func (p *fetchRequest) fetchAllPages() { for { select { case <-p.context.Done(): - logp.Debug(logSelector, "done fetching EC2 instances, context cancelled") + p.logger.Debug("done fetching EC2 instances, context cancelled") return default: if !p.fetchNextPage() { - logp.Debug(logSelector, "fetched all EC2 instances") + p.logger.Debug("fetched all EC2 instances") return } - logp.Debug(logSelector, "fetched EC2 instance") + p.logger.Debug("fetched EC2 instance") } } } diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go index 58bead6695b..10ae6f19f9d 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go @@ -5,9 +5,11 @@ package ec2 import ( - "context" "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" + "github.com/gofrs/uuid" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/autodiscover" "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/common" @@ -16,8 +18,6 @@ import ( "github.com/elastic/beats/libbeat/logp" awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" awscommon "github.com/elastic/beats/x-pack/libbeat/common/aws" - "github.com/gofrs/uuid" - "github.com/pkg/errors" ) func init() { @@ -58,7 +58,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis // set default region to make initial aws api call awsCfg.Region = "us-west-1" svcEC2 := ec2.New(awsCfg) - completeRegionsList, err := getRegions(svcEC2) + completeRegionsList, err := awsauto.GetRegions(svcEC2) if err != nil { return nil, err } @@ -142,17 +142,3 @@ func (p *Provider) onWatcherStop(arn string) { func (p *Provider) String() string { return "aws_ec2" } - -func getRegions(svc ec2iface.ClientAPI) (completeRegionsList []string, err error) { - input := &ec2.DescribeRegionsInput{} - req := svc.DescribeRegionsRequest(input) - output, err := req.Send(context.TODO()) - if err != nil { - err = errors.Wrap(err, "Failed DescribeRegions") - return - } - for _, region := range output.Regions { - completeRegionsList = append(completeRegionsList, *region.RegionName) - } - return -} diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/fetch.go b/x-pack/libbeat/autodiscover/providers/aws/elb/fetch.go index 1cdb8e89984..33a7e8bb56a 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/fetch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/fetch.go @@ -15,8 +15,6 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -const logSelector = "autodiscover-elb-fetch" - // fetcher is an interface that can fetch a list of lbListener (load balancer + listener) objects without pagination being necessary. type fetcher interface { fetch(ctx context.Context) ([]*lbListener, error) @@ -64,7 +62,7 @@ type apiFetcher struct { client elasticloadbalancingv2iface.ClientAPI } -func newAPIFetcher(ctx context.Context, clients []elasticloadbalancingv2iface.ClientAPI) fetcher { +func newAPIFetcher(clients []elasticloadbalancingv2iface.ClientAPI) fetcher { fetchers := make([]fetcher, len(clients)) for idx, client := range clients { fetchers[idx] = &apiFetcher{client} @@ -89,6 +87,7 @@ func (f *apiFetcher) fetch(ctx context.Context) ([]*lbListener, error) { taskPool: sync.Pool{}, context: ctx, cancel: cancel, + logger: logp.NewLogger("autodiscover-elb-fetch"), } // Limit concurrency against the AWS API by creating a pool of objects @@ -112,6 +111,7 @@ type fetchRequest struct { pendingTasks sync.WaitGroup context context.Context cancel func() + logger *logp.Logger } func (p *fetchRequest) fetch() ([]*lbListener, error) { @@ -138,14 +138,14 @@ func (p *fetchRequest) fetchAllPages() { for { select { case <-p.context.Done(): - logp.Debug(logSelector, "done fetching ELB pages, context cancelled") + p.logger.Debug("done fetching ELB pages, context cancelled") return default: if !p.fetchNextPage() { - logp.Debug(logSelector, "fetched all ELB pages") + p.logger.Debug("fetched all ELB pages") return } - logp.Debug(logSelector, "fetched ELB page") + p.logger.Debug("fetched ELB page") } } } diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/fetch_test.go b/x-pack/libbeat/autodiscover/providers/aws/elb/fetch_test.go index 747c9738ed3..c1eadd70a3a 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/fetch_test.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/fetch_test.go @@ -5,7 +5,6 @@ package elb import ( - "context" "testing" "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/elasticloadbalancingv2iface" @@ -14,6 +13,6 @@ import ( func Test_newAPIFetcher(t *testing.T) { client := newMockELBClient(0) - fetcher := newAPIFetcher(context.TODO(), []elasticloadbalancingv2iface.ClientAPI{client}) + fetcher := newAPIFetcher([]elasticloadbalancingv2iface.ClientAPI{client}) require.NotNil(t, fetcher) } diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go b/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go index 60fce5cbc52..7b039cfc6e5 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go @@ -5,8 +5,7 @@ package elb import ( - "context" - + "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/elasticloadbalancingv2iface" "github.com/gofrs/uuid" @@ -48,6 +47,26 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis return nil, err } + awsCfg, err := awscommon.GetAWSCredentials(awscommon.ConfigAWS{ + AccessKeyID: config.AWSConfig.AccessKeyID, + SecretAccessKey: config.AWSConfig.SecretAccessKey, + SessionToken: config.AWSConfig.SessionToken, + ProfileName: config.AWSConfig.ProfileName, + }) + + // Construct MetricSet with a full regions list if there is no region specified. + if config.Regions == nil { + // set default region to make initial aws api call + awsCfg.Region = "us-west-1" + svcEC2 := ec2.New(awsCfg) + completeRegionsList, err := awsauto.GetRegions(svcEC2) + if err != nil { + return nil, err + } + + config.Regions = completeRegionsList + } + var clients []elasticloadbalancingv2iface.ClientAPI for _, region := range config.Regions { awsCfg, err := awscommon.GetAWSCredentials(awscommon.ConfigAWS{ @@ -63,7 +82,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis clients = append(clients, elasticloadbalancingv2.New(awsCfg)) } - return internalBuilder(uuid, bus, config, newAPIFetcher(context.TODO(), clients)) + return internalBuilder(uuid, bus, config, newAPIFetcher(clients)) } // internalBuilder is mainly intended for testing via mocks and stubs. From fef6fde314d3c9773dff4f7508c0dfabf2e76006 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 18 Dec 2019 20:23:21 -0700 Subject: [PATCH 11/22] Add aws_ec2 provider into autodiscover doc --- libbeat/docs/shared-autodiscover.asciidoc | 50 +++++++++++++++++-- .../docs/autodiscover-aws-ec2-config.asciidoc | 25 ++++++++++ .../autodiscover/providers/aws/config.go | 12 +---- .../providers/aws/elb/provider.go | 12 ----- 4 files changed, 72 insertions(+), 27 deletions(-) create mode 100644 metricbeat/docs/autodiscover-aws-ec2-config.asciidoc diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index 8768d7252aa..ac61847a08e 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -328,13 +328,53 @@ include::../../x-pack/libbeat/docs/aws-credentials-config.asciidoc[] endif::autodiscoverAWSELB[] +ifdef::autodiscoverAWSEC2[] +[float] +===== Amazon EC2s + +*Note: This provider is experimental* + +The Amazon EC2 autodiscover provider discovers https://aws.amazon.com/ec2/[EC2 instances]. +This is useful for users to launch Metricbeat modules to monitor services running on AWS EC2 instances. +For example, to gather MySQL metrics from mysql servers running on EC2 instances with specific tag `service: mysql`. + +This provider will load AWS credentials using the standard AWS environment variables and shared credentials files +see https://docs.aws.amazon.com/general/latest/gr/aws-access-keys-best-practices.html[Best Practices for Managing AWS Access Keys] +for more information. If you do not wish to use these, you may explicitly set the `access_key_id` and +`secret_access_key` variables. + +These are the available fields during within config templating. +The `ec2.*` fields and `cloud.*` fields will be available on each emitted event. + +* cloud.availability_zone +* cloud.instance.id +* cloud.machine.type +* cloud.provider +* cloud.region + +* ec2.architecture +* ec2.image.id +* ec2.kernel.id +* ec2.monitoring.state +* ec2.private.dns_name +* ec2.private.ip +* ec2.public.dns_name +* ec2.public.ip +* ec2.root_device_name +* ec2.state.code +* ec2.state.name +* ec2.subnet.id +* ec2.tags +* ec2.vpc.id + +include::../../{beatname_lc}/docs/autodiscover-aws-ec2-config.asciidoc[] -ifdef::autodiscoverHints[] -[[configuration-autodiscover-hints]] -=== Hints based autodiscover +This autodiscover provider takes our standard <>. + +[id="aws-credentials-config"] +include::../../x-pack/libbeat/docs/aws-credentials-config.asciidoc[] -include::../../{beatname_lc}/docs/autodiscover-hints.asciidoc[] -endif::autodiscoverHints[] +endif::autodiscoverAWSEC2[] [[configuration-autodiscover-advanced]] === Advanced usage diff --git a/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc b/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc new file mode 100644 index 00000000000..261a2af0163 --- /dev/null +++ b/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc @@ -0,0 +1,25 @@ +{beatname_uc} supports templates for modules: + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +metricbeat.autodiscover: + providers: + - type: aws_ec2 + period: 1m + credential_profile_name: elastic-beats + templates: + - condition: + equals: + meta.ec2.tags.service: "mysql" + config: + - module: mysql + metricsets: ["status", "galera_status"] + period: 10s + hosts: ["root:password@tcp(${data.meta.ec2.public.ip}:3306)/"] + username: root + password: password +------------------------------------------------------------------------------------- + +This autodiscover provider takes our standard AWS credentials options. +With this configuration, `mysql` metricbeat module will be launched for all EC2 +instances that have `service: mysql` as a tag. diff --git a/x-pack/libbeat/autodiscover/providers/aws/config.go b/x-pack/libbeat/autodiscover/providers/aws/config.go index d0e20f33f92..e3a0873734e 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/config.go +++ b/x-pack/libbeat/autodiscover/providers/aws/config.go @@ -10,20 +10,12 @@ import ( "github.com/elastic/beats/x-pack/libbeat/common/aws" "github.com/elastic/beats/libbeat/autodiscover/template" - "github.com/elastic/beats/libbeat/common" ) // Config for all aws autodiscover providers. type Config struct { - Type string `config:"type"` - - // Standard autodiscover fields. - - // Hints are currently not supported, but may be implemented in a later release - HintsEnabled bool `config:"hints.enabled"` - Builders []*common.Config `config:"builders"` - Appenders []*common.Config `config:"appenders"` - Templates template.MapperSettings `config:"templates"` + Type string `config:"type"` + Templates template.MapperSettings `config:"templates"` // Period defines how often to poll the AWS API. Period time.Duration `config:"period" validate:"nonzero,required"` diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go b/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go index 7b039cfc6e5..b965f9ee1e2 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go @@ -93,21 +93,9 @@ func internalBuilder(uuid uuid.UUID, bus bus.Bus, config *awsauto.Config, fetche return nil, err } - builders, err := autodiscover.NewBuilders(config.Builders, nil) - if err != nil { - return nil, err - } - - appenders, err := autodiscover.NewAppenders(config.Appenders) - if err != nil { - return nil, err - } - p := &Provider{ config: config, bus: bus, - builders: builders, - appenders: appenders, templates: &mapper, uuid: uuid, } From 543e18a9be7fa679aee3dbd4a4e4bd128058621f Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 18 Dec 2019 20:30:11 -0700 Subject: [PATCH 12/22] update autodiscover doc --- metricbeat/docs/configuring-howto.asciidoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/metricbeat/docs/configuring-howto.asciidoc b/metricbeat/docs/configuring-howto.asciidoc index d532b8a5a3e..168dbfee9e6 100644 --- a/metricbeat/docs/configuring-howto.asciidoc +++ b/metricbeat/docs/configuring-howto.asciidoc @@ -79,6 +79,10 @@ include::{libbeat-dir}/shared-env-vars.asciidoc[] :autodiscoverHints: include::{libbeat-dir}/shared-autodiscover.asciidoc[] +:autodiscoverAWSEC2: +include::{libbeat-dir}/shared-autodiscover.asciidoc[] +:autodiscoverAWSEC2!: + :standalone: include::{libbeat-dir}/yaml.asciidoc[] :standalone!: From 4605f1f3440cbeb157076c27f390487d81295e09 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 19 Dec 2019 10:43:49 -0700 Subject: [PATCH 13/22] try to fix doc --- libbeat/docs/shared-autodiscover.asciidoc | 3 --- 1 file changed, 3 deletions(-) diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index ac61847a08e..fc12ee2dd88 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -371,9 +371,6 @@ include::../../{beatname_lc}/docs/autodiscover-aws-ec2-config.asciidoc[] This autodiscover provider takes our standard <>. -[id="aws-credentials-config"] -include::../../x-pack/libbeat/docs/aws-credentials-config.asciidoc[] - endif::autodiscoverAWSEC2[] [[configuration-autodiscover-advanced]] From 556830af0124ad9b153b7862ad8d78e7cb8d48cc Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 19 Dec 2019 14:16:25 -0700 Subject: [PATCH 14/22] Add unit tests --- libbeat/docs/shared-autodiscover.asciidoc | 8 ++ .../autodiscover/providers/aws/ec2/ec2.go | 6 +- .../providers/aws/ec2/mocks_test.go | 85 +++++++++++++++ .../providers/aws/ec2/provider.go | 8 +- .../providers/aws/ec2/provider_test.go | 102 ++++++++++++++++++ .../providers/aws/ec2/watch_test.go | 65 +++++++++++ .../providers/aws/elb/provider_test.go | 65 +++-------- .../providers/aws/test/provider.go | 55 ++++++++++ 8 files changed, 336 insertions(+), 58 deletions(-) create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/mocks_test.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/watch_test.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/test/provider.go diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index fc12ee2dd88..0ecbefac4b3 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -328,6 +328,14 @@ include::../../x-pack/libbeat/docs/aws-credentials-config.asciidoc[] endif::autodiscoverAWSELB[] + +ifdef::autodiscoverHints[] +[[configuration-autodiscover-hints]] +=== Hints based autodiscover + +include::../../{beatname_lc}/docs/autodiscover-hints.asciidoc[] +endif::autodiscoverHints[] + ifdef::autodiscoverAWSEC2[] [float] ===== Amazon EC2s diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go index 2190723c6e6..3a81acb4d5a 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go @@ -43,6 +43,10 @@ func (i *ec2Instance) toMap() common.MapStr { return m } +func (i *ec2Instance) instanceID() string { + return awsauto.SafeString(i.ec2Instance.InstanceId) +} + func (i *ec2Instance) toImage() common.MapStr { m := common.MapStr{} m["id"] = awsauto.SafeString(i.ec2Instance.ImageId) @@ -102,7 +106,7 @@ func (i *ec2Instance) toCloudMap() common.MapStr { m["region"] = availabilityZone[:len(availabilityZone)-1] instance := common.MapStr{} - instance["id"] = awsauto.SafeString(i.ec2Instance.InstanceId) + instance["id"] = i.instanceID() m["instance"] = instance instanceType, err := i.ec2Instance.InstanceType.MarshalValue() diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/mocks_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/mocks_test.go new file mode 100644 index 00000000000..542ddf4925e --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/mocks_test.go @@ -0,0 +1,85 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "context" + "sync" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ec2" +) + +// mockFetcher is a fetcher that returns a customizable list of results, useful for testing. +type mockFetcher struct { + ec2Instances []*ec2Instance + err error + lock sync.Mutex +} + +func newMockFetcher(lbListeners []*ec2Instance, err error) *mockFetcher { + return &mockFetcher{ec2Instances: lbListeners, err: err} +} + +func (f *mockFetcher) fetch(ctx context.Context) ([]*ec2Instance, error) { + f.lock.Lock() + defer f.lock.Unlock() + + result := make([]*ec2Instance, len(f.ec2Instances)) + copy(result, f.ec2Instances) + + return result, f.err +} + +func (f *mockFetcher) setEC2s(newEC2s []*ec2Instance) { + f.lock.Lock() + defer f.lock.Unlock() + + f.ec2Instances = newEC2s +} + +func (f *mockFetcher) setError(err error) { + f.lock.Lock() + defer f.lock.Unlock() + + f.ec2Instances = []*ec2Instance{} + f.err = err +} + +func fakeEC2Instance() *ec2Instance { + runningCode := int64(16) + coreCount := int64(1) + threadsPerCore := int64(1) + publicDNSName := "ec2-1-2-3-4.us-west-1.compute.amazonaws.com" + publicIP := "1.2.3.4" + privateDNSName := "ip-5-6-7-8.us-west-1.compute.internal" + privateIP := "5.6.7.8" + instanceID := "i-123" + + instance := ec2.Instance{ + InstanceId: aws.String(instanceID), + InstanceType: ec2.InstanceTypeT2Medium, + Placement: &ec2.Placement{ + AvailabilityZone: aws.String("us-west-1a"), + }, + ImageId: aws.String("image-123"), + State: &ec2.InstanceState{ + Name: ec2.InstanceStateNameRunning, + Code: &runningCode, + }, + Monitoring: &ec2.Monitoring{ + State: ec2.MonitoringStateDisabled, + }, + CpuOptions: &ec2.CpuOptions{ + CoreCount: &coreCount, + ThreadsPerCore: &threadsPerCore, + }, + PublicDnsName: &publicDNSName, + PublicIpAddress: &publicIP, + PrivateDnsName: &privateDNSName, + PrivateIpAddress: &privateIP, + } + return &ec2Instance{ec2Instance: instance} +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go index 10ae6f19f9d..bab0ccf4b79 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go @@ -113,11 +113,11 @@ func (p *Provider) Stop() { p.watcher.stop() } -func (p *Provider) onWatcherStart(arn string, instance *ec2Instance) { +func (p *Provider) onWatcherStart(instanceID string, instance *ec2Instance) { e := bus.Event{ "start": true, "provider": p.uuid, - "id": arn, + "id": instanceID, "meta": common.MapStr{ "ec2": instance.toMap(), "cloud": instance.toCloudMap(), @@ -130,10 +130,10 @@ func (p *Provider) onWatcherStart(arn string, instance *ec2Instance) { p.bus.Publish(e) } -func (p *Provider) onWatcherStop(arn string) { +func (p *Provider) onWatcherStop(instanceID string) { e := bus.Event{ "stop": true, - "id": arn, + "id": instanceID, "provider": p.uuid, } p.bus.Publish(e) diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go new file mode 100644 index 00000000000..45ac81bb53b --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go @@ -0,0 +1,102 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" + "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws/test" +) + +func Test_internalBuilder(t *testing.T) { + instance := fakeEC2Instance() + instances := []*ec2Instance{instance} + fetcher := newMockFetcher(instances, nil) + pBus := bus.New("test") + + cfg := &awsauto.Config{ + Regions: []string{"us-east-1a", "us-west-1b"}, + Period: time.Nanosecond, + } + + uuid, _ := uuid.NewV4() + provider, err := internalBuilder(uuid, pBus, cfg, fetcher) + require.NoError(t, err) + + startListener := pBus.Subscribe("start") + stopListener := pBus.Subscribe("stop") + listenerDone := make(chan struct{}) + defer close(listenerDone) + + var events test.TestEventAccumulator + go func() { + for { + select { + case e := <-startListener.Events(): + events.Add(e) + case e := <-stopListener.Events(): + events.Add(e) + case <-listenerDone: + return + } + } + }() + + // Let run twice to ensure that duplicates don't create two start events + // Since we're turning a list of assets into a list of changes the second once() call should be a noop + provider.watcher.once() + provider.watcher.once() + events.WaitForNumEvents(t, 1, time.Second) + + assert.Equal(t, 1, events.Len()) + + expectedStartEvent := bus.Event{ + "id": instance.instanceID(), + "provider": uuid, + "start": true, + "meta": common.MapStr{ + "ec2": instance.toMap(), + "cloud": instance.toCloudMap(), + }, + } + + require.Equal(t, expectedStartEvent, events.Get()[0]) + + fetcher.setEC2s([]*ec2Instance{}) + + // Let run twice to ensure that duplicates don't cause an issue + provider.watcher.once() + provider.watcher.once() + events.WaitForNumEvents(t, 2, time.Second) + + require.Equal(t, 2, events.Len()) + + expectedStopEvent := bus.Event{ + "stop": true, + "id": awsauto.SafeString(instance.ec2Instance.InstanceId), + "provider": uuid, + } + + require.Equal(t, expectedStopEvent, events.Get()[1]) + + // Test that in an error situation nothing changes. + preErrorEventCount := events.Len() + fetcher.setError(errors.New("oops")) + + // Let run twice to ensure that duplicates don't cause an issue + provider.watcher.once() + provider.watcher.once() + + assert.Equal(t, preErrorEventCount, events.Len()) +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch_test.go new file mode 100644 index 00000000000..83cae46efed --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch_test.go @@ -0,0 +1,65 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/stretchr/testify/assert" +) + +func TestWatchTicks(t *testing.T) { + instances := []*ec2Instance{fakeEC2Instance()} + + lock := sync.Mutex{} + var startUUIDs []string + var startEC2s []*ec2Instance + var stopUUIDs []string + + fetcher := newMockFetcher(instances, nil) + watcher := newWatcher( + fetcher, + time.Millisecond, + func(uuid string, lbListener *ec2Instance) { + lock.Lock() + defer lock.Unlock() + + startUUIDs = append(startUUIDs, uuid) + startEC2s = append(startEC2s, lbListener) + }, + func(uuid string) { + lock.Lock() + defer lock.Unlock() + + stopUUIDs = append(stopUUIDs, uuid) + }) + defer watcher.stop() + + // Run through 10 ticks + for i := 0; i < 10; i++ { + err := watcher.once() + require.NoError(t, err) + } + + // The instanceID is the unique identifier used. + instanceIDs := []string{*instances[0].ec2Instance.InstanceId} + + // Test that we've seen one ec2 start, but none stop + assert.Equal(t, instanceIDs, startUUIDs) + assert.Len(t, stopUUIDs, 0) + assert.Equal(t, instances, startEC2s) + + // Stop the ec2 and test that we see a single stop + // and no change to starts + fetcher.setEC2s(nil) + watcher.once() + + assert.Equal(t, instanceIDs, startUUIDs) + assert.Equal(t, instanceIDs, stopUUIDs) +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go b/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go index 086d6a3ab15..d10ef699e27 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go @@ -5,7 +5,6 @@ package elb import ( - "sync" "testing" "time" @@ -17,49 +16,9 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" + "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws/test" ) -type testEventAccumulator struct { - events []bus.Event - lock sync.Mutex -} - -func (tea *testEventAccumulator) add(e bus.Event) { - tea.lock.Lock() - defer tea.lock.Unlock() - - tea.events = append(tea.events, e) -} - -func (tea *testEventAccumulator) len() int { - tea.lock.Lock() - defer tea.lock.Unlock() - - return len(tea.events) -} - -func (tea *testEventAccumulator) get() []bus.Event { - tea.lock.Lock() - defer tea.lock.Unlock() - - res := make([]bus.Event, len(tea.events)) - copy(res, tea.events) - return res -} - -func (tea *testEventAccumulator) waitForNumEvents(t *testing.T, targetLen int, timeout time.Duration) { - start := time.Now() - - for time.Now().Sub(start) < timeout { - if tea.len() >= targetLen { - return - } - time.Sleep(time.Millisecond) - } - - t.Fatalf("Timed out waiting for num events to be %d", targetLen) -} - func Test_internalBuilder(t *testing.T) { lbl := fakeLbl() lbls := []*lbListener{lbl} @@ -80,14 +39,14 @@ func Test_internalBuilder(t *testing.T) { listenerDone := make(chan struct{}) defer close(listenerDone) - var events testEventAccumulator + var events test.TestEventAccumulator go func() { for { select { case e := <-startListener.Events(): - events.add(e) + events.Add(e) case e := <-stopListener.Events(): - events.add(e) + events.Add(e) case <-listenerDone: return } @@ -98,9 +57,9 @@ func Test_internalBuilder(t *testing.T) { // Since we're turning a list of assets into a list of changes the second once() call should be a noop provider.watcher.once() provider.watcher.once() - events.waitForNumEvents(t, 1, time.Second) + events.WaitForNumEvents(t, 1, time.Second) - assert.Equal(t, 1, events.len()) + assert.Equal(t, 1, events.Len()) expectedStartEvent := bus.Event{ "id": lbl.arn(), @@ -114,16 +73,16 @@ func Test_internalBuilder(t *testing.T) { }, } - require.Equal(t, expectedStartEvent, events.get()[0]) + require.Equal(t, expectedStartEvent, events.Get()[0]) fetcher.setLbls([]*lbListener{}) // Let run twice to ensure that duplicates don't cause an issue provider.watcher.once() provider.watcher.once() - events.waitForNumEvents(t, 2, time.Second) + events.WaitForNumEvents(t, 2, time.Second) - require.Equal(t, 2, events.len()) + require.Equal(t, 2, events.Len()) expectedStopEvent := bus.Event{ "stop": true, @@ -131,15 +90,15 @@ func Test_internalBuilder(t *testing.T) { "provider": uuid, } - require.Equal(t, expectedStopEvent, events.get()[1]) + require.Equal(t, expectedStopEvent, events.Get()[1]) // Test that in an error situation nothing changes. - preErrorEventCount := events.len() + preErrorEventCount := events.Len() fetcher.setError(errors.New("oops")) // Let run twice to ensure that duplicates don't cause an issue provider.watcher.once() provider.watcher.once() - assert.Equal(t, preErrorEventCount, events.len()) + assert.Equal(t, preErrorEventCount, events.Len()) } diff --git a/x-pack/libbeat/autodiscover/providers/aws/test/provider.go b/x-pack/libbeat/autodiscover/providers/aws/test/provider.go new file mode 100644 index 00000000000..9f08cb3bb27 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/test/provider.go @@ -0,0 +1,55 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package test + +import ( + "sync" + "testing" + "time" + + "github.com/elastic/beats/libbeat/common/bus" +) + +// TestEventAccumulator +type TestEventAccumulator struct { + events []bus.Event + lock sync.Mutex +} + +func (tea *TestEventAccumulator) Add(e bus.Event) { + tea.lock.Lock() + defer tea.lock.Unlock() + + tea.events = append(tea.events, e) +} + +func (tea *TestEventAccumulator) Len() int { + tea.lock.Lock() + defer tea.lock.Unlock() + + return len(tea.events) +} + +func (tea *TestEventAccumulator) Get() []bus.Event { + tea.lock.Lock() + defer tea.lock.Unlock() + + res := make([]bus.Event, len(tea.events)) + copy(res, tea.events) + return res +} + +func (tea *TestEventAccumulator) WaitForNumEvents(t *testing.T, targetLen int, timeout time.Duration) { + start := time.Now() + + for time.Now().Sub(start) < timeout { + if tea.Len() >= targetLen { + return + } + time.Sleep(time.Millisecond) + } + + t.Fatalf("Timed out waiting for num events to be %d", targetLen) +} From 0f7afa642ae6f81573a22843a6ca10a9f6863541 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 19 Dec 2019 14:52:14 -0700 Subject: [PATCH 15/22] add comments --- x-pack/libbeat/autodiscover/providers/aws/test/provider.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/libbeat/autodiscover/providers/aws/test/provider.go b/x-pack/libbeat/autodiscover/providers/aws/test/provider.go index 9f08cb3bb27..353b6d62631 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/test/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/test/provider.go @@ -12,12 +12,13 @@ import ( "github.com/elastic/beats/libbeat/common/bus" ) -// TestEventAccumulator +// TestEventAccumulator defined a list of events for testing type TestEventAccumulator struct { events []bus.Event lock sync.Mutex } +// Add expends events func (tea *TestEventAccumulator) Add(e bus.Event) { tea.lock.Lock() defer tea.lock.Unlock() @@ -25,6 +26,7 @@ func (tea *TestEventAccumulator) Add(e bus.Event) { tea.events = append(tea.events, e) } +// Len returns length of events func (tea *TestEventAccumulator) Len() int { tea.lock.Lock() defer tea.lock.Unlock() @@ -32,6 +34,7 @@ func (tea *TestEventAccumulator) Len() int { return len(tea.events) } +// Get copies the event and return it func (tea *TestEventAccumulator) Get() []bus.Event { tea.lock.Lock() defer tea.lock.Unlock() @@ -41,6 +44,7 @@ func (tea *TestEventAccumulator) Get() []bus.Event { return res } +// WaitForNumEvents waits to get target length of events func (tea *TestEventAccumulator) WaitForNumEvents(t *testing.T, targetLen int, timeout time.Duration) { start := time.Now() From 6af9ca2c3fb360af76648681cb788409952d22c5 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 19 Dec 2019 15:29:08 -0700 Subject: [PATCH 16/22] Fix metricbeat doc --- metricbeat/docs/configuring-howto.asciidoc | 2 -- .../libbeat/autodiscover/providers/aws/test/provider.go | 8 ++++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/metricbeat/docs/configuring-howto.asciidoc b/metricbeat/docs/configuring-howto.asciidoc index 168dbfee9e6..b282976c097 100644 --- a/metricbeat/docs/configuring-howto.asciidoc +++ b/metricbeat/docs/configuring-howto.asciidoc @@ -77,8 +77,6 @@ include::{libbeat-dir}/shared-env-vars.asciidoc[] :autodiscoverJolokia: :autodiscoverHints: -include::{libbeat-dir}/shared-autodiscover.asciidoc[] - :autodiscoverAWSEC2: include::{libbeat-dir}/shared-autodiscover.asciidoc[] :autodiscoverAWSEC2!: diff --git a/x-pack/libbeat/autodiscover/providers/aws/test/provider.go b/x-pack/libbeat/autodiscover/providers/aws/test/provider.go index 353b6d62631..46f5fbbbe85 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/test/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/test/provider.go @@ -18,7 +18,7 @@ type TestEventAccumulator struct { lock sync.Mutex } -// Add expends events +// TestEventAccumulator.Add expends events func (tea *TestEventAccumulator) Add(e bus.Event) { tea.lock.Lock() defer tea.lock.Unlock() @@ -26,7 +26,7 @@ func (tea *TestEventAccumulator) Add(e bus.Event) { tea.events = append(tea.events, e) } -// Len returns length of events +// TestEventAccumulator.Len returns length of events func (tea *TestEventAccumulator) Len() int { tea.lock.Lock() defer tea.lock.Unlock() @@ -34,7 +34,7 @@ func (tea *TestEventAccumulator) Len() int { return len(tea.events) } -// Get copies the event and return it +// TestEventAccumulator.Get copies the event and return it func (tea *TestEventAccumulator) Get() []bus.Event { tea.lock.Lock() defer tea.lock.Unlock() @@ -44,7 +44,7 @@ func (tea *TestEventAccumulator) Get() []bus.Event { return res } -// WaitForNumEvents waits to get target length of events +// TestEventAccumulator.WaitForNumEvents waits to get target length of events func (tea *TestEventAccumulator) WaitForNumEvents(t *testing.T, targetLen int, timeout time.Duration) { start := time.Now() From 672c42bd0e4c07f51f81a04326154f5895c1822f Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 19 Dec 2019 16:30:06 -0700 Subject: [PATCH 17/22] Make houndci happy --- .../libbeat/autodiscover/providers/aws/test/provider.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/libbeat/autodiscover/providers/aws/test/provider.go b/x-pack/libbeat/autodiscover/providers/aws/test/provider.go index 46f5fbbbe85..353b6d62631 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/test/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/test/provider.go @@ -18,7 +18,7 @@ type TestEventAccumulator struct { lock sync.Mutex } -// TestEventAccumulator.Add expends events +// Add expends events func (tea *TestEventAccumulator) Add(e bus.Event) { tea.lock.Lock() defer tea.lock.Unlock() @@ -26,7 +26,7 @@ func (tea *TestEventAccumulator) Add(e bus.Event) { tea.events = append(tea.events, e) } -// TestEventAccumulator.Len returns length of events +// Len returns length of events func (tea *TestEventAccumulator) Len() int { tea.lock.Lock() defer tea.lock.Unlock() @@ -34,7 +34,7 @@ func (tea *TestEventAccumulator) Len() int { return len(tea.events) } -// TestEventAccumulator.Get copies the event and return it +// Get copies the event and return it func (tea *TestEventAccumulator) Get() []bus.Event { tea.lock.Lock() defer tea.lock.Unlock() @@ -44,7 +44,7 @@ func (tea *TestEventAccumulator) Get() []bus.Event { return res } -// TestEventAccumulator.WaitForNumEvents waits to get target length of events +// WaitForNumEvents waits to get target length of events func (tea *TestEventAccumulator) WaitForNumEvents(t *testing.T, targetLen int, timeout time.Duration) { start := time.Now() From 8e70d892c847f27bc4cd7b033bb728e8501ae16e Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 3 Feb 2020 15:56:17 -0700 Subject: [PATCH 18/22] update changelog --- CHANGELOG.next.asciidoc | 59 ----------------------------------------- 1 file changed, 59 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 22192c481d8..9580f74f2d2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -92,66 +92,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* -- Add a friendly log message when a request to docker has exceeded the deadline. {pull}15336[15336] -- Decouple Debug logging from fail_on_error logic for rename, copy, truncate processors {pull}12451[12451] -- Add an option to append to existing logs rather than always rotate on start. {pull}11953[11953] -- Add `network` condition to processors for matching IP addresses against CIDRs. {pull}10743[10743] -- Add if/then/else support to processors. {pull}10744[10744] -- Add `community_id` processor for computing network flow hashes. {pull}10745[10745] -- Add output test to kafka output {pull}10834[10834] -- Gracefully shut down on SIGHUP {pull}10704[10704] -- New processor: `copy_fields`. {pull}11303[11303] -- Add `error.message` to events when `fail_on_error` is set in `rename` and `copy_fields` processors. {pull}11303[11303] -- New processor: `truncate_fields`. {pull}11297[11297] -- Allow a beat to ship monitoring data directly to an Elasticsearch monitoring cluster. {pull}9260[9260] -- Updated go-seccomp-bpf library to v1.1.0 which updates syscall lists for Linux v5.0. {pull}11394[11394] -- Add `add_observer_metadata` processor. {pull}11394[11394] -- Add `decode_csv_fields` processor. {pull}11753[11753] -- Add `convert` processor for converting data types of fields. {issue}8124[8124] {pull}11686[11686] -- New `extract_array` processor. {pull}11761[11761] -- Add number of goroutines to reported metrics. {pull}12135[12135] -- Add `proxy_disable` output flag to explicitly ignore proxy environment variables. {issue}11713[11713] {pull}12243[12243] -- Processor `add_cloud_metadata` adds fields `cloud.account.id` and `cloud.image.id` for AWS EC2. {pull}12307[12307] -- Add configurable bulk_flush_frequency in kafka output. {pull}12254[12254] -- Add `decode_base64_field` processor for decoding base64 field. {pull}11914[11914] -- Add support for reading the `network.iana_number` field by default to the community_id processor. {pull}12701[12701] -- Add aws overview dashboard. {issue}11007[11007] {pull}12175[12175] -- Add `decompress_gzip_field` processor. {pull}12733[12733] -- Add `timestamp` processor for parsing time fields. {pull}12699[12699] -- Fail with error when autodiscover providers have no defined configs. {pull}13078[13078] -- Add a check so alias creation explicitely fails if there is an index with the same name. {pull}13070[13070] -- Update kubernetes watcher to use official client-go libraries. {pull}13051[13051] -- Add support for unix epoch time values in the `timestamp` processor. {pull}13319[13319] -- add_host_metadata is now GA. {pull}13148[13148] -- Add an `ignore_missing` configuration option the `drop_fields` processor. {pull}13318[13318] -- add_host_metadata is no GA. {pull}13148[13148] -- Add `registered_domain` processor for deriving the registered domain from a given FQDN. {pull}13326[13326] -- Add support for RFC3339 time zone offsets in JSON output. {pull}13227[13227] -- Add autodetection mode for add_docker_metadata and enable it by default in included configuration files{pull}13374[13374] -- Added `monitoring.cluster_uuid` setting to associate Beat data with specified ES cluster in Stack Monitoring UI. {pull}13182[13182] -- Add autodetection mode for add_kubernetes_metadata and enable it by default in included configuration files. {pull}13473[13473] -- Add `providers` setting to `add_cloud_metadata` processor. {pull}13812[13812] -- Use less restrictive API to check if template exists. {pull}13847[13847] -- Do not check for alias when setup.ilm.check_exists is false. {pull}13848[13848] -- Add support for numeric time zone offsets in timestamp processor. {pull}13902[13902] -- Add condition to the config file template for add_kubernetes_metadata {pull}14056[14056] -- Marking Central Management deprecated. {pull}14018[14018] -- Add `keep_null` setting to allow Beats to publish null values in events. {issue}5522[5522] {pull}13928[13928] -- Add shared_credential_file option in aws related config for specifying credential file directory. {issue}14157[14157] {pull}14178[14178] -- GA the `script` processor. {pull}14325[14325] -- Add `fingerprint` processor. {issue}11173[11173] {pull}14205[14205] -- Add support for API keys in Elasticsearch outputs. {pull}14324[14324] -- Ensure that init containers are no longer tailed after they stop {pull}14394[14394] -- Add consumer_lag in Kafka consumergroup metricset {pull}14822[14822] -- Make use of consumer_lag in Kafka dashboard {pull}14863[14863] -- Refactor kubernetes autodiscover to enable different resource based discovery {pull}14738[14738] -- Add `add_id` processor. {pull}14524[14524] -- Enable TLS 1.3 in all beats. {pull}12973[12973] - Add `aws_ec2` provider for autodiscover. {issue}12518[12518] {pull}14823[14823] -- Enable DEP (Data Execution Protection) for Windows packages. {pull}15149[15149] -- Spooling to disk creates a lockfile on each platform. {pull}15338[15338] -- Add document_id setting to decode_json_fields processor. {pull}15859[15859] - *Auditbeat* From eb0d83f512d6f16467b426df158d90209252748f Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 3 Feb 2020 15:57:04 -0700 Subject: [PATCH 19/22] update changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9580f74f2d2..e7eed5b7a10 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* +- Add document_id setting to decode_json_fields processor. {pull}15859[15859] - Add `aws_ec2` provider for autodiscover. {issue}12518[12518] {pull}14823[14823] *Auditbeat* From 48f121c5146eaa3da69a3a67aec8665f589c0c0a Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 5 Feb 2020 18:52:44 -0700 Subject: [PATCH 20/22] Add aws prefix to meta ec2 --- .../autodiscover/providers/aws/ec2/provider.go | 4 +++- .../providers/aws/ec2/provider_test.go | 17 ++++++++++------- .../autodiscover/providers/aws/ec2/watch.go | 2 +- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go index bab0ccf4b79..be83d22ab50 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go @@ -119,7 +119,9 @@ func (p *Provider) onWatcherStart(instanceID string, instance *ec2Instance) { "provider": p.uuid, "id": instanceID, "meta": common.MapStr{ - "ec2": instance.toMap(), + "aws": common.MapStr{ + "ec2": instance.toMap(), + }, "cloud": instance.toCloudMap(), }, } diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go index 45ac81bb53b..d86f15afea4 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go @@ -8,22 +8,23 @@ import ( "testing" "time" - "github.com/gofrs/uuid" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/logp" awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws/test" + "github.com/gofrs/uuid" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_internalBuilder(t *testing.T) { instance := fakeEC2Instance() instances := []*ec2Instance{instance} fetcher := newMockFetcher(instances, nil) - pBus := bus.New("test") + log := logp.NewLogger("ec2") + pBus := bus.New(log, "test") cfg := &awsauto.Config{ Regions: []string{"us-east-1a", "us-west-1b"}, @@ -66,7 +67,9 @@ func Test_internalBuilder(t *testing.T) { "provider": uuid, "start": true, "meta": common.MapStr{ - "ec2": instance.toMap(), + "aws": common.MapStr{ + "ec2": instance.toMap(), + }, "cloud": instance.toCloudMap(), }, } diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go index 223d130ec29..4ae56465d9c 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go @@ -77,7 +77,7 @@ func (w *watcher) once() error { if err != nil { return err } - w.logger.Debugf("autodiscover-ec2", "fetched %d ec2 instances from AWS for autodiscovery", len(fetchedEC2s)) + w.logger.Debugf("fetched %d ec2 instances from AWS for autodiscover", len(fetchedEC2s)) oldGen := w.gen w.gen++ From 656379e3c08e12d007c3b3529c98144d77dc0c89 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 6 Feb 2020 06:24:05 -0700 Subject: [PATCH 21/22] update field names from ec2 to aws.ec2 --- libbeat/docs/shared-autodiscover.asciidoc | 30 +++++++++---------- .../docs/autodiscover-aws-ec2-config.asciidoc | 4 +-- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index ced071154b9..5ddda476ddd 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -378,7 +378,7 @@ for more information. If you do not wish to use these, you may explicitly set th `secret_access_key` variables. These are the available fields during within config templating. -The `ec2.*` fields and `cloud.*` fields will be available on each emitted event. +The `aws.ec2.*` fields and `cloud.*` fields will be available on each emitted event. * cloud.availability_zone * cloud.instance.id @@ -386,20 +386,20 @@ The `ec2.*` fields and `cloud.*` fields will be available on each emitted event. * cloud.provider * cloud.region -* ec2.architecture -* ec2.image.id -* ec2.kernel.id -* ec2.monitoring.state -* ec2.private.dns_name -* ec2.private.ip -* ec2.public.dns_name -* ec2.public.ip -* ec2.root_device_name -* ec2.state.code -* ec2.state.name -* ec2.subnet.id -* ec2.tags -* ec2.vpc.id +* aws.ec2.architecture +* aws.ec2.image.id +* aws.ec2.kernel.id +* aws.ec2.monitoring.state +* aws.ec2.private.dns_name +* aws.ec2.private.ip +* aws.ec2.public.dns_name +* aws.ec2.public.ip +* aws.ec2.root_device_name +* aws.ec2.state.code +* aws.ec2.state.name +* aws.ec2.subnet.id +* aws.ec2.tags +* aws.ec2.vpc.id include::../../{beatname_lc}/docs/autodiscover-aws-ec2-config.asciidoc[] diff --git a/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc b/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc index 261a2af0163..eefa06064f8 100644 --- a/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc +++ b/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc @@ -10,12 +10,12 @@ metricbeat.autodiscover: templates: - condition: equals: - meta.ec2.tags.service: "mysql" + meta.aws.ec2.tags.service: "mysql" config: - module: mysql metricsets: ["status", "galera_status"] period: 10s - hosts: ["root:password@tcp(${data.meta.ec2.public.ip}:3306)/"] + hosts: ["root:password@tcp(${data.meta.aws.ec2.public.ip}:3306)/"] username: root password: password ------------------------------------------------------------------------------------- From 286d2ada91fab4bf39a379deb43887fdd9d98e79 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 6 Feb 2020 09:13:23 -0700 Subject: [PATCH 22/22] Add aws.ec2.* to autodiscover template --- metricbeat/docs/autodiscover-aws-ec2-config.asciidoc | 4 ++-- x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go | 6 +++++- .../libbeat/autodiscover/providers/aws/ec2/provider_test.go | 6 +++++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc b/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc index eefa06064f8..42f9497c2aa 100644 --- a/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc +++ b/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc @@ -10,12 +10,12 @@ metricbeat.autodiscover: templates: - condition: equals: - meta.aws.ec2.tags.service: "mysql" + aws.ec2.tags.service: "mysql" config: - module: mysql metricsets: ["status", "galera_status"] period: 10s - hosts: ["root:password@tcp(${data.meta.aws.ec2.public.ip}:3306)/"] + hosts: ["root:password@tcp(${data.aws.ec2.public.ip}:3306)/"] username: root password: password ------------------------------------------------------------------------------------- diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go index be83d22ab50..4c9aa3e8b43 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go @@ -118,9 +118,13 @@ func (p *Provider) onWatcherStart(instanceID string, instance *ec2Instance) { "start": true, "provider": p.uuid, "id": instanceID, + "aws": common.MapStr{ + "ec2": instance.toMap(), + }, + "cloud": instance.toCloudMap(), "meta": common.MapStr{ "aws": common.MapStr{ - "ec2": instance.toMap(), + "ec2": instance.toMap(), }, "cloud": instance.toCloudMap(), }, diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go index d86f15afea4..b9c3acc1748 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go @@ -66,9 +66,13 @@ func Test_internalBuilder(t *testing.T) { "id": instance.instanceID(), "provider": uuid, "start": true, + "aws": common.MapStr{ + "ec2": instance.toMap(), + }, + "cloud": instance.toCloudMap(), "meta": common.MapStr{ "aws": common.MapStr{ - "ec2": instance.toMap(), + "ec2": instance.toMap(), }, "cloud": instance.toCloudMap(), },