Skip to content

Commit

Permalink
[Processor: add_cloud_metadata] Use AWS client to get instance metada…
Browse files Browse the repository at this point in the history
…ta and EKS cluster name (#35182)

* add generic metadata fetcher

Signed-off-by: Tetiana Kravchenko <[email protected]>

* merge main

Signed-off-by: Tetiana Kravchenko <[email protected]>

* clean up

Signed-off-by: Tetiana Kravchenko <[email protected]>

* move tagDescribe to different func

Signed-off-by: Tetiana Kravchenko <[email protected]>

* add tests for add_cloud_metadata

Signed-off-by: Tetiana Kravchenko <[email protected]>

* Tiltfile: fix docker_registry, use more generic value

Signed-off-by: Tetiana Kravchenko <[email protected]>

* add notice file

Signed-off-by: Tetiana Kravchenko <[email protected]>

* fix tests - add former test cases; fix linter issues

Signed-off-by: Tetiana Kravchenko <[email protected]>

* handle correctly result.err

Signed-off-by: Tetiana Kravchenko <[email protected]>

* add generic metadata fetcher

Signed-off-by: Tetiana Kravchenko <[email protected]>

* merge main

Signed-off-by: Tetiana Kravchenko <[email protected]>

* clean up

Signed-off-by: Tetiana Kravchenko <[email protected]>

* move tagDescribe to different func

Signed-off-by: Tetiana Kravchenko <[email protected]>

* add tests for add_cloud_metadata

Signed-off-by: Tetiana Kravchenko <[email protected]>

* Tiltfile: fix docker_registry, use more generic value

Signed-off-by: Tetiana Kravchenko <[email protected]>

* add notice file

Signed-off-by: Tetiana Kravchenko <[email protected]>

* fix tests - add former test cases; fix linter issues

Signed-off-by: Tetiana Kravchenko <[email protected]>

* handle correctly result.err

Signed-off-by: Tetiana Kravchenko <[email protected]>

* address reviews

Signed-off-by: Tetiana Kravchenko <[email protected]>

* Update dev-tools/kubernetes/Tiltfile

Co-authored-by: kaiyan-sheng <[email protected]>

* fix the types.TagDescription struct

Signed-off-by: Tetiana Kravchenko <[email protected]>

* remove not used variable; fix types.TagDescription struct

Signed-off-by: Tetiana Kravchenko <[email protected]>

* add a changelog record

Signed-off-by: Tetiana Kravchenko <[email protected]>

* change Debugf to Warnf

Signed-off-by: Tetiana Kravchenko <[email protected]>

---------

Signed-off-by: Tetiana Kravchenko <[email protected]>
Co-authored-by: kaiyan-sheng <[email protected]>
  • Loading branch information
2 people authored and chrisberkhout committed Jun 1, 2023
1 parent d9d0db0 commit 6c7cfb4
Show file tree
Hide file tree
Showing 8 changed files with 593 additions and 414 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.0.1
rev: v4.4.0
hooks:
- id: check-merge-conflict

- repo: https://github.com/elastic/apm-pipeline-library.git
rev: current
rev: v1.1.397
hooks:
- id: check-jenkins-pipelines
files: ^(.ci/(.*\.groovy|Jenkinsfile)|Jenkinsfile)$
Expand Down
3 changes: 1 addition & 2 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff]

*Affecting all Beats*


*Auditbeat*


Expand Down Expand Up @@ -228,7 +227,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add Hetzner Cloud as a provider for `add_cloud_metadata`. {pull}35456[35456]
- Reload Beat when TLS certificates or key files are modified. {issue}34408[34408] {pull}34416[34416]
- Upgrade version of elastic-agent-autodiscover to v0.6.1 for improved memory consumption on k8s. {pull}35483[35483]

- Added `orchestrator.cluster.id` and `orchestrator.cluster.name` fields to the add_cloud_metadata processor, AWS cloud provider. {pull}35182[35182]

*Auditbeat*

Expand Down
424 changes: 212 additions & 212 deletions NOTICE.txt

Large diffs are not rendered by default.

24 changes: 18 additions & 6 deletions dev-tools/kubernetes/Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ def build(
default_registry(docker_registry)
print("Docker registry: {}".format(docker_registry))

if k8s_env == "aws":
# In order to push to AWS you need to run:
# aws ecr get-login-password --region us-east-2 | docker login --username AWS --password-stdin XXXXX.dkr.ecr.us-east-2.amazonaws.com/metricbeat-debug
#
# More info at https://docs.aws.amazon.com/AmazonECR/latest/userguide/docker-push-ecr-image.html
docker_registry = "XXXXX.dkr.ecr.us-east-2.amazonaws.com".format(
docker_image)

default_registry(docker_registry)
print("Docker registry: {}".format(docker_registry))


print("Docker image: {}".format(docker_image))

docker_file = '{}/Dockerfile.{}'.format(beat, mode)
Expand Down Expand Up @@ -149,7 +161,7 @@ def beat(
if arch not in ["arm64", "amd64"]:
print("Invalid arch: {}".format(arch))
exit(-1)
if k8s_env not in ["kind", "gcp"]:
if k8s_env not in ["kind", "gcp", "aws"]:
print("Invalid k8s_env: {}".format(k8s_env))
exit(-1)
if k8s_cluster not in ["single", "multi"]:
Expand Down Expand Up @@ -191,10 +203,10 @@ def beat(
k8s_expose(beat=beat, mode=mode, k8s_cluster=k8s_cluster)


beat(beat="heartbeat",
mode="debug",
# mode="run",
arch="arm64",
k8s_env="kind",
beat(beat="metricbeat",
# mode="debug",
mode="run",
arch="amd64",
k8s_env="aws",
k8s_cluster="single",
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1
github.com/Azure/go-autorest/autorest/adal v0.9.14
github.com/apache/arrow/go/v11 v11.0.0
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.7
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.17
github.com/aws/aws-sdk-go-v2/service/cloudformation v1.20.4
github.com/aws/aws-sdk-go-v2/service/kinesis v1.15.8
Expand Down Expand Up @@ -242,7 +243,6 @@ require (
github.com/armon/go-radix v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.38.60 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.3 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.14 // indirect
Expand Down
55 changes: 55 additions & 0 deletions libbeat/processors/add_cloud_metadata/generic_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_cloud_metadata

import (
"context"
"net/http"

cfg "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type genericFetcher struct {
provider string
schema schemaConv
fetchRawProviderMetadata func(context.Context, http.Client, *result)
}

func newGenericMetadataFetcher(
c *cfg.C,
provider string,
conv schemaConv,
genericFetcherMeta func(context.Context, http.Client, *result),
) (*genericFetcher, error) {

gFetcher := &genericFetcher{provider, conv, genericFetcherMeta}
return gFetcher, nil
}

func (g *genericFetcher) fetchMetadata(ctx context.Context, client http.Client) result {
res := result{provider: g.provider, metadata: mapstr.M{}}
g.fetchRawProviderMetadata(ctx, client, &res)
if res.err != nil {
return res
}
res.metadata = g.schema(res.metadata)
_, _ = res.metadata.Put("cloud.provider", g.provider)

return res
}
191 changes: 100 additions & 91 deletions libbeat/processors/add_cloud_metadata/provider_aws_ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,125 +18,134 @@
package add_cloud_metadata

import (
"context"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
awscfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

s "github.com/elastic/beats/v7/libbeat/common/schema"
c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)

const (
ec2InstanceIdentityURI = "/2014-02-25/dynamic/instance-identity/document"
ec2InstanceIMDSv2TokenValueHeader = "X-aws-ec2-metadata-token"
ec2InstanceIMDSv2TokenTTLHeader = "X-aws-ec2-metadata-token-ttl-seconds"
ec2InstanceIMDSv2TokenTTLValue = "21600"
ec2InstanceIMDSv2TokenURI = "/latest/api/token"
)

// fetches IMDSv2 token, returns empty one on errors
func getIMDSv2Token(c *conf.C) string {
logger := logp.NewLogger("add_cloud_metadata")
type IMDSClient interface {
GetInstanceIdentityDocument(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error)
}

config := defaultConfig()
if err := c.Unpack(&config); err != nil {
logger.Warnf("error when load config for getting IMDSv2 token: %s. No token in the metadata request will be used.", err)
return ""
}
var NewIMDSClient func(cfg awssdk.Config) IMDSClient = func(cfg awssdk.Config) IMDSClient {
return imds.NewFromConfig(cfg)
}

tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
logger.Warnf("error when load TLS config for getting IMDSv2 token: %s. No token in the metadata request will be used.", err)
return ""
}
type EC2Client interface {
DescribeTags(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error)
}

client := http.Client{
Timeout: config.Timeout,
Transport: &http.Transport{
DisableKeepAlives: true,
DialContext: (&net.Dialer{
Timeout: config.Timeout,
KeepAlive: 0,
}).DialContext,
TLSClientConfig: tlsConfig.ToConfig(),
},
}
var NewEC2Client func(cfg awssdk.Config) EC2Client = func(cfg awssdk.Config) EC2Client {
return ec2.NewFromConfig(cfg)
}

tokenReq, err := http.NewRequest("PUT", fmt.Sprintf("http://%s%s", metadataHost, ec2InstanceIMDSv2TokenURI), nil)
if err != nil {
logger.Warnf("error when make token request for getting IMDSv2 token: %s. No token in the metadata request will be used.", err)
return ""
}
// AWS EC2 Metadata Service
var ec2MetadataFetcher = provider{
Name: "aws-ec2",

tokenReq.Header.Add(ec2InstanceIMDSv2TokenTTLHeader, ec2InstanceIMDSv2TokenTTLValue)
rsp, err := client.Do(tokenReq)
if rsp == nil {
logger.Warnf("read token request for getting IMDSv2 token returns empty: %s. No token in the metadata request will be used.", err)
return ""
}
Local: true,

defer func(body io.ReadCloser) {
if body != nil {
body.Close()
Create: func(_ string, config *conf.C) (metadataFetcher, error) {
ec2Schema := func(m map[string]interface{}) mapstr.M {
m["service"] = mapstr.M{
"name": "EC2",
}
return mapstr.M{"cloud": m}
}
}(rsp.Body)

fetcher, err := newGenericMetadataFetcher(config, "aws", ec2Schema, fetchRawProviderMetadata)
return fetcher, err
},
}

// fetchRaw queries raw metadata from a hosting provider's metadata service.
func fetchRawProviderMetadata(
ctx context.Context,
client http.Client,
result *result,
) {
logger := logp.NewLogger("add_cloud_metadata")

// LoadDefaultConfig loads the EC2 role credentials
awsConfig, err := awscfg.LoadDefaultConfig(context.TODO(), awscfg.WithHTTPClient(&client))
if err != nil {
logger.Warnf("error when read token request for getting IMDSv2 token: %s. No token in the metadata request will be used.", err)
return ""
logger.Warnf("error loading AWS default configuration: %s.", err)
result.err = fmt.Errorf("failed loading AWS default configuration: %w", err)
return
}
awsClient := NewIMDSClient(awsConfig)

if rsp.StatusCode != http.StatusOK {
logger.Warnf("error when check request status for getting IMDSv2 token: http request status %d. No token in the metadata request will be used.", rsp.StatusCode)
return ""
instanceIdentity, err := awsClient.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{})
if err != nil {
logger.Warnf("error fetching EC2 Identity Document: %s.", err)
result.err = fmt.Errorf("failed fetching EC2 Identity Document: %w", err)
return
}

all, err := ioutil.ReadAll(rsp.Body)
// AWS Region must be set to be able to get EC2 Tags
awsRegion := instanceIdentity.InstanceIdentityDocument.Region
awsConfig.Region = awsRegion

clusterName, err := fetchEC2ClusterNameTag(awsConfig, instanceIdentity.InstanceIdentityDocument.InstanceID)
if err != nil {
logger.Warnf("error when reading token request for getting IMDSv2 token: %s. No token in the metadata request will be used.", err)
return ""
logger.Warnf("error fetching cluster name metadata: %s.", err)
}

return string(all)
}
accountID := instanceIdentity.InstanceIdentityDocument.AccountID

// AWS EC2 Metadata Service
var ec2MetadataFetcher = provider{
Name: "aws-ec2",
_, _ = result.metadata.Put("instance.id", instanceIdentity.InstanceIdentityDocument.InstanceID)
_, _ = result.metadata.Put("machine.type", instanceIdentity.InstanceIdentityDocument.InstanceType)
_, _ = result.metadata.Put("region", awsRegion)
_, _ = result.metadata.Put("availability_zone", instanceIdentity.InstanceIdentityDocument.AvailabilityZone)
_, _ = result.metadata.Put("account.id", accountID)
_, _ = result.metadata.Put("image.id", instanceIdentity.InstanceIdentityDocument.ImageID)

Local: true,
// for AWS cluster ID is used cluster ARN: arn:partition:service:region:account-id:resource-type/resource-id, example:
// arn:aws:eks:us-east-2:627286350134:cluster/cluster-name
if clusterName != "" {
clusterARN := fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%v", awsRegion, accountID, clusterName)

Create: func(_ string, config *conf.C) (metadataFetcher, error) {
ec2Schema := func(m map[string]interface{}) mapstr.M {
m["serviceName"] = "EC2"
out, _ := s.Schema{
"instance": s.Object{"id": c.Str("instanceId")},
"machine": s.Object{"type": c.Str("instanceType")},
"region": c.Str("region"),
"availability_zone": c.Str("availabilityZone"),
"service": s.Object{
"name": c.Str("serviceName"),
},
"account": s.Object{"id": c.Str("accountId")},
"image": s.Object{"id": c.Str("imageId")},
}.Apply(m)
return mapstr.M{"cloud": out}
}
_, _ = result.metadata.Put("orchestrator.cluster.id", clusterARN)
_, _ = result.metadata.Put("orchestrator.cluster.name", clusterName)
}
}

headers := make(map[string]string, 1)
token := getIMDSv2Token(config)
if len(token) > 0 {
headers[ec2InstanceIMDSv2TokenValueHeader] = token
}
func fetchEC2ClusterNameTag(awsConfig awssdk.Config, instanceID string) (string, error) {
svc := NewEC2Client(awsConfig)
input := &ec2.DescribeTagsInput{
Filters: []types.Filter{
{
Name: awssdk.String("resource-id"),
Values: []string{
instanceID,
},
},
{
Name: awssdk.String("key"),
Values: []string{
"eks:cluster-name",
},
},
},
}

fetcher, err := newMetadataFetcher(config, "aws", headers, metadataHost, ec2Schema, ec2InstanceIdentityURI)
return fetcher, err
},
tagsResult, err := svc.DescribeTags(context.TODO(), input)
if err != nil {
return "", fmt.Errorf("error fetching EC2 Tags: %w", err)
}
if len(tagsResult.Tags) == 1 {
return *tagsResult.Tags[0].Value, nil
}
return "", nil
}
Loading

0 comments on commit 6c7cfb4

Please sign in to comment.