Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Processor: add_cloud_metadata] Use AWS client to get instance metadata and EKS cluster name #35182

Merged
merged 32 commits into from
May 31, 2023
Merged
Changes from 12 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8d26505
add generic metadata fetcher
tetianakravchenko Jan 23, 2023
3da87ca
Merge branch 'main' into eks-cluster-name
tetianakravchenko Mar 2, 2023
39a690b
Merge branch 'main' into eks-cluster-name
tetianakravchenko Apr 4, 2023
481f65c
merge main
tetianakravchenko Apr 10, 2023
4886d96
Merge branch 'main' into eks-cluster-name
tetianakravchenko Apr 10, 2023
76a4da4
clean up
tetianakravchenko Apr 10, 2023
d659712
move tagDescribe to different func
tetianakravchenko Apr 18, 2023
737f3d3
add tests for add_cloud_metadata
tetianakravchenko Apr 24, 2023
9255c71
Tiltfile: fix docker_registry, use more generic value
tetianakravchenko Apr 24, 2023
db67e1b
add notice file
tetianakravchenko Apr 24, 2023
bb5e429
fix tests - add former test cases; fix linter issues
tetianakravchenko Apr 26, 2023
98b5cb5
handle correctly result.err
tetianakravchenko Apr 26, 2023
fb0293e
add generic metadata fetcher
tetianakravchenko Jan 23, 2023
5b5e54d
merge main
tetianakravchenko Apr 10, 2023
1769926
clean up
tetianakravchenko Apr 10, 2023
64439f3
move tagDescribe to different func
tetianakravchenko Apr 18, 2023
bbd928c
add tests for add_cloud_metadata
tetianakravchenko Apr 24, 2023
b193f5c
Tiltfile: fix docker_registry, use more generic value
tetianakravchenko Apr 24, 2023
add4956
add notice file
tetianakravchenko Apr 24, 2023
1b865c9
fix tests - add former test cases; fix linter issues
tetianakravchenko Apr 26, 2023
b6c28bd
handle correctly result.err
tetianakravchenko Apr 26, 2023
da6ba8b
Merge branch 'eks-cluster-name' of github.com:tetianakravchenko/beats…
tetianakravchenko Apr 27, 2023
0241f22
address reviews
tetianakravchenko Apr 27, 2023
1b23f61
Update dev-tools/kubernetes/Tiltfile
tetianakravchenko May 3, 2023
79070e6
fix the types.TagDescription struct
tetianakravchenko May 14, 2023
1679fed
Merge branch 'eks-cluster-name' of github.com:tetianakravchenko/beats…
tetianakravchenko May 14, 2023
87a3dd7
remove not used variable; fix types.TagDescription struct
tetianakravchenko May 14, 2023
f4ae2cf
add a changelog record
tetianakravchenko May 15, 2023
979778b
Merge branch 'main' into eks-cluster-name
tetianakravchenko May 15, 2023
19ab154
change Debugf to Warnf
tetianakravchenko May 30, 2023
b9ed1e8
merge main; fix conflicts
tetianakravchenko May 30, 2023
29c4e2e
Merge branch 'main' into eks-cluster-name
tetianakravchenko May 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
424 changes: 212 additions & 212 deletions NOTICE.txt

Large diffs are not rendered by default.

24 changes: 18 additions & 6 deletions dev-tools/kubernetes/Tiltfile
Original file line number Diff line number Diff line change
@@ -47,6 +47,18 @@ def build(
default_registry(docker_registry)
print("Docker registry: {}".format(docker_registry))

if k8s_env == "aws":
# In order to push to AWS you need ti run:
tetianakravchenko marked this conversation as resolved.
Show resolved Hide resolved
# aws ecr get-login-password --region us-east-2 | docker login --username AWS --password-stdin XXXXX.dkr.ecr.us-east-2.amazonaws.com/metricbeat-debug
#
# More info at https://docs.aws.amazon.com/AmazonECR/latest/userguide/docker-push-ecr-image.html
docker_registry = "XXXXX.dkr.ecr.us-east-2.amazonaws.com".format(
docker_image)

default_registry(docker_registry)
print("Docker registry: {}".format(docker_registry))


print("Docker image: {}".format(docker_image))

docker_file = '{}/Dockerfile.{}'.format(beat, mode)
@@ -149,7 +161,7 @@ def beat(
if arch not in ["arm64", "amd64"]:
print("Invalid arch: {}".format(arch))
exit(-1)
if k8s_env not in ["kind", "gcp"]:
if k8s_env not in ["kind", "gcp", "aws"]:
print("Invalid k8s_env: {}".format(k8s_env))
exit(-1)
if k8s_cluster not in ["single", "multi"]:
@@ -191,10 +203,10 @@ def beat(
k8s_expose(beat=beat, mode=mode, k8s_cluster=k8s_cluster)


beat(beat="heartbeat",
mode="debug",
# mode="run",
arch="arm64",
k8s_env="kind",
beat(beat="metricbeat",
# mode="debug",
mode="run",
arch="amd64",
k8s_env="aws",
k8s_cluster="single",
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -187,6 +187,7 @@ require (
cloud.google.com/go v0.105.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1
github.com/Azure/go-autorest/autorest/adal v0.9.14
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.7
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.17
github.com/aws/aws-sdk-go-v2/service/cloudformation v1.20.4
github.com/aws/aws-sdk-go-v2/service/kinesis v1.15.8
@@ -235,7 +236,6 @@ require (
github.com/armon/go-radix v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.38.60 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.3 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.14 // indirect
55 changes: 55 additions & 0 deletions libbeat/processors/add_cloud_metadata/generic_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_cloud_metadata

import (
"context"
"net/http"

cfg "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type genericFetcher struct {
provider string
schema schemaConv
fetchRawProviderMetadata func(context.Context, http.Client, *result)
}

func newGenericMetadataFetcher(
c *cfg.C,
provider string,
conv schemaConv,
genericFetcherMeta func(context.Context, http.Client, *result),
) (*genericFetcher, error) {

gFetcher := &genericFetcher{provider, conv, genericFetcherMeta}
return gFetcher, nil
}

func (g *genericFetcher) fetchMetadata(ctx context.Context, client http.Client) result {
res := result{provider: g.provider, metadata: mapstr.M{}}
g.fetchRawProviderMetadata(ctx, client, &res)
if res.err != nil {
return res
}
res.metadata = g.schema(res.metadata)
_, _ = res.metadata.Put("cloud.provider", g.provider)

return res
}
192 changes: 101 additions & 91 deletions libbeat/processors/add_cloud_metadata/provider_aws_ec2.go
Original file line number Diff line number Diff line change
@@ -18,125 +18,135 @@
package add_cloud_metadata

import (
"context"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
awscfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/pkg/errors"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just import fmt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done - 0241f22


"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

s "github.com/elastic/beats/v7/libbeat/common/schema"
c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)

const (
ec2InstanceIdentityURI = "/2014-02-25/dynamic/instance-identity/document"
ec2InstanceIMDSv2TokenValueHeader = "X-aws-ec2-metadata-token"
ec2InstanceIMDSv2TokenTTLHeader = "X-aws-ec2-metadata-token-ttl-seconds"
ec2InstanceIMDSv2TokenTTLValue = "21600"
ec2InstanceIMDSv2TokenURI = "/latest/api/token"
)

// fetches IMDSv2 token, returns empty one on errors
func getIMDSv2Token(c *conf.C) string {
logger := logp.NewLogger("add_cloud_metadata")
type IMDSClient interface {
GetInstanceIdentityDocument(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error)
}

config := defaultConfig()
if err := c.Unpack(&config); err != nil {
logger.Warnf("error when load config for getting IMDSv2 token: %s. No token in the metadata request will be used.", err)
return ""
}
var NewIMDSClient func(cfg awssdk.Config) IMDSClient = func(cfg awssdk.Config) IMDSClient {
return imds.NewFromConfig(cfg)
}

tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
logger.Warnf("error when load TLS config for getting IMDSv2 token: %s. No token in the metadata request will be used.", err)
return ""
}
type EC2Client interface {
DescribeTags(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error)
}

client := http.Client{
Timeout: config.Timeout,
Transport: &http.Transport{
DisableKeepAlives: true,
DialContext: (&net.Dialer{
Timeout: config.Timeout,
KeepAlive: 0,
}).DialContext,
TLSClientConfig: tlsConfig.ToConfig(),
},
}
var NewEC2Client func(cfg awssdk.Config) EC2Client = func(cfg awssdk.Config) EC2Client {
return ec2.NewFromConfig(cfg)
}

tokenReq, err := http.NewRequest("PUT", fmt.Sprintf("http://%s%s", metadataHost, ec2InstanceIMDSv2TokenURI), nil)
if err != nil {
logger.Warnf("error when make token request for getting IMDSv2 token: %s. No token in the metadata request will be used.", err)
return ""
}
// AWS EC2 Metadata Service
var ec2MetadataFetcher = provider{
Name: "aws-ec2",

tokenReq.Header.Add(ec2InstanceIMDSv2TokenTTLHeader, ec2InstanceIMDSv2TokenTTLValue)
rsp, err := client.Do(tokenReq)
if rsp == nil {
logger.Warnf("read token request for getting IMDSv2 token returns empty: %s. No token in the metadata request will be used.", err)
return ""
}
Local: true,

defer func(body io.ReadCloser) {
if body != nil {
body.Close()
Create: func(_ string, config *conf.C) (metadataFetcher, error) {
ec2Schema := func(m map[string]interface{}) mapstr.M {
m["service"] = mapstr.M{
"name": "EC2",
}
return mapstr.M{"cloud": m}
}
}(rsp.Body)

fetcher, err := newGenericMetadataFetcher(config, "aws", ec2Schema, fetchRawProviderMetadata)
return fetcher, err
},
}

// fetchRaw queries raw metadata from a hosting provider's metadata service.
func fetchRawProviderMetadata(
ctx context.Context,
client http.Client,
result *result,
) {
logger := logp.NewLogger("add_cloud_metadata")

// LoadDefaultConfig loads the EC2 role credentials
awsConfig, err := awscfg.LoadDefaultConfig(context.TODO(), awscfg.WithHTTPClient(&client))
if err != nil {
logger.Warnf("error when read token request for getting IMDSv2 token: %s. No token in the metadata request will be used.", err)
return ""
logger.Debugf("error loading AWS default configuration: %s.", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an error message, is there a reason the log level is debug? Should be Errorf.

Copy link
Contributor Author

@tetianakravchenko tetianakravchenko May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_cloud_metadata tries to fetch the metadata from all the available cloud providers, so if running on GCP - fetchMetadata for AWS will still run and in this case it might will be confusing to get errors on GCP regarding the AWS configuration. I've changed it to Warnf as it was done before - 19ab154

result.err = errors.Wrapf(err, "failed loading AWS default configuration")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
result.err = errors.Wrapf(err, "failed loading AWS default configuration")
result.err = fmt.Errorf("%w failed loading AWS default configuration", error)

return
}
awsClient := NewIMDSClient(awsConfig)

if rsp.StatusCode != http.StatusOK {
logger.Warnf("error when check request status for getting IMDSv2 token: http request status %d. No token in the metadata request will be used.", rsp.StatusCode)
return ""
instanceIdentity, err := awsClient.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{})
if err != nil {
logger.Debugf("error fetching EC2 Identity Document: %s.", err)
result.err = errors.Wrapf(err, "failed fetching EC2 Identity Document.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above with fmt.errorf

return
}

all, err := ioutil.ReadAll(rsp.Body)
// AWS Region must be set to be able to get EC2 Tags
awsRegion := instanceIdentity.InstanceIdentityDocument.Region
awsConfig.Region = awsRegion

clusterName, err := fetchEC2ClusterNameTag(awsConfig, instanceIdentity.InstanceIdentityDocument.InstanceID)
if err != nil {
logger.Warnf("error when reading token request for getting IMDSv2 token: %s. No token in the metadata request will be used.", err)
return ""
logger.Debugf("error fetching cluster name metadata: %s.", err)
Copy link
Contributor

@gizas gizas Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be Warnf level and not debug?

Copy link
Contributor

@gizas gizas May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tetianakravchenko only minor if you think that we need to expose this to warnings or to errors , as it is quite important to print this by default, or repeat what you do in line

result.err = errors.Wrapf(err, "failed fetching EC2 Identity Document.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gizas thank you for pointing it out! From my understanding it should be fine to use it this way, http fetcher uses this definition as well -

result.err = errors.Wrapf(err, "failed to create http request for %v", f.provider)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment has to do more with the level error vs debug vs warn. I would say not to use debug as it would hide any messages unless you raise the log level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is outdated - was replaced to Warn in 0241f22

}

return string(all)
}
accountID := instanceIdentity.InstanceIdentityDocument.AccountID

// AWS EC2 Metadata Service
var ec2MetadataFetcher = provider{
Name: "aws-ec2",
_, _ = result.metadata.Put("instance.id", instanceIdentity.InstanceIdentityDocument.InstanceID)
_, _ = result.metadata.Put("machine.type", instanceIdentity.InstanceIdentityDocument.InstanceType)
_, _ = result.metadata.Put("region", awsRegion)
_, _ = result.metadata.Put("availability_zone", instanceIdentity.InstanceIdentityDocument.AvailabilityZone)
_, _ = result.metadata.Put("account.id", accountID)
_, _ = result.metadata.Put("image.id", instanceIdentity.InstanceIdentityDocument.ImageID)

Local: true,
// for AWS cluster ID is used cluster ARN: arn:partition:service:region:account-id:resource-type/resource-id, example:
// arn:aws:eks:us-east-2:627286350134:cluster/cluster-name
if clusterName != "" {
clusterARN := fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%v", awsRegion, accountID, clusterName)

Create: func(_ string, config *conf.C) (metadataFetcher, error) {
ec2Schema := func(m map[string]interface{}) mapstr.M {
m["serviceName"] = "EC2"
out, _ := s.Schema{
"instance": s.Object{"id": c.Str("instanceId")},
"machine": s.Object{"type": c.Str("instanceType")},
"region": c.Str("region"),
"availability_zone": c.Str("availabilityZone"),
"service": s.Object{
"name": c.Str("serviceName"),
},
"account": s.Object{"id": c.Str("accountId")},
"image": s.Object{"id": c.Str("imageId")},
}.Apply(m)
return mapstr.M{"cloud": out}
}
_, _ = result.metadata.Put("orchestrator.cluster.name", clusterName)
_, _ = result.metadata.Put("orchestrator.cluster.id", clusterARN)
Copy link
Contributor Author

@tetianakravchenko tetianakravchenko May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: this field is already defined in ecs orchestrator.cluster.id

}
}

headers := make(map[string]string, 1)
token := getIMDSv2Token(config)
if len(token) > 0 {
headers[ec2InstanceIMDSv2TokenValueHeader] = token
}
func fetchEC2ClusterNameTag(awsConfig awssdk.Config, instanceID string) (string, error) {
svc := NewEC2Client(awsConfig)
input := &ec2.DescribeTagsInput{
Filters: []types.Filter{
{
Name: awssdk.String("resource-id"),
Values: []string{
instanceID,
},
},
{
Name: awssdk.String("key"),
Values: []string{
"eks:cluster-name",
},
},
},
}

fetcher, err := newMetadataFetcher(config, "aws", headers, metadataHost, ec2Schema, ec2InstanceIdentityURI)
return fetcher, err
},
tagsResult, err := svc.DescribeTags(context.TODO(), input)
if err != nil {
return "", fmt.Errorf("error fetching EC2 Tags: %w", err)
}
if len(tagsResult.Tags) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick question here: since we are only returning Tags[0], does that mean we only have one set of tag key/value returned from the DescribeTags API call? If so, should we check len(tagsResult.Tags) == 1 instead?

Copy link
Contributor Author

@tetianakravchenko tetianakravchenko May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there only 1 element is expected, it should be the same as to run:

instance_id=$(curl http://169.254.169.254/latest/meta-data/instance-id)
aws ec2 describe-tags --filter Name=resource-id,Values=$instance_id Name=key,Values=eks:cluster-name --region us-east-2
{
    "Tags": [
        {
            "Key": "eks:cluster-name",
            "ResourceId": "i-020c696ca32297693",
            "ResourceType": "instance",
            "Value": "cluster-name"
        }
    ]
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaiyan-sheng I've changed it in 79070e6
I've found a mistake I make in types.TagDescription struct, now it return exactly 1 element

return *tagsResult.Tags[0].Value, nil
}
return "", nil
}
Loading