Skip to content

Commit

Permalink
[AWS] [EC2] enrich events with EC2 tags with add_cloud_metadata proce…
Browse files Browse the repository at this point in the history
…ssor (#41477)

* add support to extract ec2 tags from IMDS endpoint

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* add dedicated tests for tag extractor

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* expand test case and add documentation

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* add changelog entry

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* handle empty tags, add tests and close underlying body

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* review change - use aws.tags as tag prefix

Signed-off-by: Kavindu Dodanduwa <[email protected]>

---------

Signed-off-by: Kavindu Dodanduwa <[email protected]>
  • Loading branch information
Kavindu-Dodan authored Nov 12, 2024
1 parent 734ae05 commit c878397
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 121 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Libbeat*

- enrich events with EC2 tags in add_cloud_metadata processor {pull}41477[41477]


*Heartbeat*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ examples for each of the supported providers.

_AWS_

Metadata given below are extracted from https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html[instance identity document],

[source,json]
-------------------------------------------------------------------------------
{
Expand All @@ -98,6 +100,22 @@ _AWS_
}
-------------------------------------------------------------------------------

If the EC2 instance has IMDS enabled and if tags are allowed through IMDS endpoint, the processor will further append tags in metadata.
Please refer official documentation on https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html[IMDS endpoint] for further details.

[source,json]
-------------------------------------------------------------------------------
{
"aws": {
"tags": {
"org" : "myOrg",
"owner": "userID"
}
}
}
-------------------------------------------------------------------------------


_Digital Ocean_

[source,json]
Expand Down
137 changes: 114 additions & 23 deletions libbeat/processors/add_cloud_metadata/provider_aws_ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ package add_cloud_metadata
import (
"context"
"fmt"
"io"
"net/http"
"strings"

"github.com/elastic/elastic-agent-libs/logp"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
awscfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"
Expand All @@ -35,7 +38,14 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
)

const (
eksClusterNameTagKey = "eks:cluster-name"
tagsCategory = "tags/instance"
tagPrefix = "aws.tags"
)

type IMDSClient interface {
ec2rolecreds.GetMetadataAPIClient
GetInstanceIdentityDocument(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error)
}

Expand Down Expand Up @@ -90,30 +100,17 @@ func fetchRawProviderMetadata(
result.err = fmt.Errorf("failed loading AWS default configuration: %w", err)
return
}
awsClient := NewIMDSClient(awsConfig)

instanceIdentity, err := awsClient.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{})
imdsClient := NewIMDSClient(awsConfig)
instanceIdentity, err := imdsClient.GetInstanceIdentityDocument(ctx, &imds.GetInstanceIdentityDocumentInput{})
if err != nil {
result.err = fmt.Errorf("failed fetching EC2 Identity Document: %w", err)
return
}

// AWS Region must be set to be able to get EC2 Tags
awsRegion := instanceIdentity.InstanceIdentityDocument.Region
awsConfig.Region = awsRegion
accountID := instanceIdentity.InstanceIdentityDocument.AccountID

clusterName, err := fetchEC2ClusterNameTag(awsConfig, instanceIdentity.InstanceIdentityDocument.InstanceID)
if err != nil {
logger.Warnf("error fetching cluster name metadata: %s.", err)
} else if clusterName != "" {
// for AWS cluster ID is used cluster ARN: arn:partition:service:region:account-id:resource-type/resource-id, example:
// arn:aws:eks:us-east-2:627286350134:cluster/cluster-name
clusterARN := fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%v", awsRegion, accountID, clusterName)

_, _ = result.metadata.Put("orchestrator.cluster.id", clusterARN)
_, _ = result.metadata.Put("orchestrator.cluster.name", clusterName)
}
instanceID := instanceIdentity.InstanceIdentityDocument.InstanceID

_, _ = result.metadata.Put("cloud.instance.id", instanceIdentity.InstanceIdentityDocument.InstanceID)
_, _ = result.metadata.Put("cloud.machine.type", instanceIdentity.InstanceIdentityDocument.InstanceType)
Expand All @@ -122,10 +119,106 @@ func fetchRawProviderMetadata(
_, _ = result.metadata.Put("cloud.account.id", accountID)
_, _ = result.metadata.Put("cloud.image.id", instanceIdentity.InstanceIdentityDocument.ImageID)

// AWS Region must be set to be able to get EC2 Tags
awsConfig.Region = awsRegion
tags := getTags(ctx, imdsClient, NewEC2Client(awsConfig), instanceID, logger)

if tags[eksClusterNameTagKey] != "" {
// for AWS cluster ID is used cluster ARN: arn:partition:service:region:account-id:resource-type/resource-id, example:
// arn:aws:eks:us-east-2:627286350134:cluster/cluster-name
clusterARN := fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%v", awsRegion, accountID, tags[eksClusterNameTagKey])

_, _ = result.metadata.Put("orchestrator.cluster.id", clusterARN)
_, _ = result.metadata.Put("orchestrator.cluster.name", tags[eksClusterNameTagKey])
}

if len(tags) == 0 {
return
}

logger.Infof("Adding retrieved tags with key: %s", tagPrefix)
for k, v := range tags {
_, _ = result.metadata.Put(fmt.Sprintf("%s.%s", tagPrefix, k), v)
}
}

// getTags is a helper to extract EC2 tags. Internally it utilize multiple extraction methods.
func getTags(ctx context.Context, imdsClient IMDSClient, ec2Client EC2Client, instanceId string, logger *logp.Logger) map[string]string {
logger.Info("Extracting EC2 tags from IMDS endpoint")
tags, ok := getTagsFromIMDS(ctx, imdsClient, logger)
if ok {
return tags
}

logger.Info("Tag extraction from IMDS failed, fallback to DescribeTags API to obtain EKS cluster name.")
clusterName, err := clusterNameFromDescribeTag(ctx, ec2Client, instanceId)
if err != nil {
logger.Warnf("error obtaining cluster name: %v.", err)
return tags
}

if clusterName != "" {
tags[eksClusterNameTagKey] = clusterName
}
return tags
}

// getTagsFromIMDS is a helper to extract EC2 tags using instance metadata service.
// Note that this call could get throttled and currently does not implement a retry mechanism.
// See - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html#instancedata-throttling
func getTagsFromIMDS(ctx context.Context, client IMDSClient, logger *logp.Logger) (tags map[string]string, ok bool) {
tags = make(map[string]string)

b, err := getMetadataHelper(ctx, client, tagsCategory, logger)
if err != nil {
logger.Warnf("error obtaining tags category: %v", err)
return tags, false
}

for _, tag := range strings.Split(string(b), "\n") {
tagPath := fmt.Sprintf("%s/%s", tagsCategory, tag)
b, err := getMetadataHelper(ctx, client, tagPath, logger)
if err != nil {
logger.Warnf("error extracting tag value of %s: %v", tag, err)
return tags, false
}

tagValue := string(b)
if tagValue == "" {
logger.Infof("Ignoring tag key %s as value is empty", tag)
continue
}

tags[tag] = tagValue
}

return tags, true
}

// getMetadataHelper performs the IMDS call for the given path and returns the response content after closing the underlying content reader.
func getMetadataHelper(ctx context.Context, client IMDSClient, path string, logger *logp.Logger) (content []byte, err error) {
metadata, err := client.GetMetadata(ctx, &imds.GetMetadataInput{Path: path})
if err != nil {
return nil, fmt.Errorf("error from IMDS metadata request: %w", err)
}

defer func(Content io.ReadCloser) {
err := Content.Close()
if err != nil {
logger.Warnf("error closing IMDS metadata response body: %v", err)
}
}(metadata.Content)

content, err = io.ReadAll(metadata.Content)
if err != nil {
return nil, fmt.Errorf("error extracting metadata from the IMDS response: %w", err)
}

return content, nil
}

func fetchEC2ClusterNameTag(awsConfig awssdk.Config, instanceID string) (string, error) {
svc := NewEC2Client(awsConfig)
// clusterNameFromDescribeTag is a helper to extract EKS cluster name using DescribeTag.
func clusterNameFromDescribeTag(ctx context.Context, ec2Client EC2Client, instanceID string) (string, error) {
input := &ec2.DescribeTagsInput{
Filters: []types.Filter{
{
Expand All @@ -135,15 +228,13 @@ func fetchEC2ClusterNameTag(awsConfig awssdk.Config, instanceID string) (string,
},
},
{
Name: awssdk.String("key"),
Values: []string{
"eks:cluster-name",
},
Name: awssdk.String("key"),
Values: []string{eksClusterNameTagKey},
},
},
}

tagsResult, err := svc.DescribeTags(context.TODO(), input)
tagsResult, err := ec2Client.DescribeTags(ctx, input)
if err != nil {
return "", fmt.Errorf("error fetching EC2 Tags: %w", err)
}
Expand Down
Loading

0 comments on commit c878397

Please sign in to comment.