Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated EKS Resource Detector to use Go Client for Kubernetes #813

Merged
merged 5 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- Supported minimum version of Go bumped from 1.14 to 1.15. (#787)
- EKS Resource Detector now use the Kubernetes Go client to obtain the ConfigMap. (#813)

### Removed

Expand Down
140 changes: 49 additions & 91 deletions detectors/aws/eks/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,48 @@ package eks

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"regexp"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv"
)

const (
k8sSvcURL = "https://kubernetes.default.svc"
k8sTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
k8sCertPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
authConfigmapPath = "/api/v1/namespaces/kube-system/configmaps/aws-auth"
cwConfigmapPath = "/api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info"
authConfigmapNS = "kube-system"
authConfigmapName = "aws-auth"
cwConfigmapNS = "amazon-cloudwatch"
cwConfigmapName = "cluster-info"
defaultCgroupPath = "/proc/self/cgroup"
containerIDLength = 64
timeoutMillis = 2000
)

// detectorUtils is used for testing the resourceDetector by abstracting functions that rely on external systems.
type detectorUtils interface {
fileExists(filename string) bool
fetchString(httpMethod string, URL string) (string, error)
getConfigMap(ctx context.Context, namespace string, name string) (map[string]string, error)
getContainerID() (string, error)
}

// This struct will implement the detectorUtils interface
type eksDetectorUtils struct{}
type eksDetectorUtils struct {
clientset *kubernetes.Clientset
}

// resourceDetector for detecting resources running on Amazon EKS
type resourceDetector struct {
utils detectorUtils
}

// This struct will help unmarshal clustername from JSON response
type data struct {
ClusterName string `json:"cluster.name"`
err error
}

// Compile time assertion that resourceDetector implements the resource.Detector interface.
Expand All @@ -71,13 +68,17 @@ var _ detectorUtils = (*eksDetectorUtils)(nil)

// NewResourceDetector returns a resource detector that will detect AWS EKS resources.
func NewResourceDetector() resource.Detector {
return &resourceDetector{utils: eksDetectorUtils{}}
utils, err := newK8sDetectorUtils()
return &resourceDetector{utils: utils, err: err}
}

// Detect returns a Resource describing the Amazon EKS environment being run in.
func (detector *resourceDetector) Detect(ctx context.Context) (*resource.Resource, error) {
if detector.err != nil {
return nil, detector.err
}

isEks, err := isEKS(detector.utils)
isEks, err := isEKS(ctx, detector.utils)
if err != nil {
return nil, err
}
Expand All @@ -91,7 +92,7 @@ func (detector *resourceDetector) Detect(ctx context.Context) (*resource.Resourc
attributes := []attribute.KeyValue{}

// Get clusterName and append to attributes
clusterName, err := getClusterName(detector.utils)
clusterName, err := getClusterName(ctx, detector.utils)
if err != nil {
return nil, err
}
Expand All @@ -110,22 +111,38 @@ func (detector *resourceDetector) Detect(ctx context.Context) (*resource.Resourc

// Return new resource object with clusterName and containerID as attributes
return resource.NewWithAttributes(attributes...), nil

}

// isEKS checks if the current environment is running in EKS.
func isEKS(utils detectorUtils) (bool, error) {
func isEKS(ctx context.Context, utils detectorUtils) (bool, error) {
if !isK8s(utils) {
return false, nil
}

// Make HTTP GET request
awsAuth, err := utils.fetchString(http.MethodGet, k8sSvcURL+authConfigmapPath)
awsAuth, err := utils.getConfigMap(ctx, authConfigmapNS, authConfigmapName)
if err != nil {
return false, fmt.Errorf("isEks() error retrieving auth configmap: %w", err)
}

return awsAuth != "", nil
return awsAuth != nil, nil
}

// newK8sDetectorUtils creates the Kubernetes clientset
func newK8sDetectorUtils() (*eksDetectorUtils, error) {
// Get cluster configuration
confs, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to create config: %w", err)
}

// Create clientset using generated configuration
clientset, err := kubernetes.NewForConfig(confs)
if err != nil {
return nil, fmt.Errorf("failed to create clientset for Kubernetes client")
}

return &eksDetectorUtils{clientset: clientset}, nil
}

// isK8s checks if the current environment is running in a Kubernetes environment
Expand All @@ -139,84 +156,24 @@ func (eksUtils eksDetectorUtils) fileExists(filename string) bool {
return err == nil && !info.IsDir()
}

// fetchString executes an HTTP request with a given HTTP Method and URL string.
func (eksUtils eksDetectorUtils) fetchString(httpMethod string, URL string) (string, error) {
request, err := http.NewRequest(httpMethod, URL, nil)
if err != nil {
return "", fmt.Errorf("failed to create new HTTP request with method=%s, URL=%s: %w", httpMethod, URL, err)
}

// Set HTTP request header with authentication credentials
authHeader, err := getK8sCredHeader()
// getConfigMap retrieves the configuration map from the k8s API
func (eksUtils eksDetectorUtils) getConfigMap(ctx context.Context, namespace string, name string) (map[string]string, error) {
cm, err := eksUtils.clientset.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", err
return nil, fmt.Errorf("failed to retrieve ConfigMap %s/%s: %w", namespace, name, err)
}
request.Header.Set("Authorization", authHeader)

// Get certificate
caCert, err := ioutil.ReadFile(k8sCertPath)
if err != nil {
return "", fmt.Errorf("failed to read file with path %s", k8sCertPath)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

// Set HTTP request timeout and add certificate
client := &http.Client{
Timeout: timeoutMillis * time.Millisecond,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
},
},
}

response, err := client.Do(request)
if err != nil || response.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to execute HTTP request with method=%s, URL=%s, Status Code=%d: %w", httpMethod, URL, response.StatusCode, err)
}

// Retrieve response body from HTTP request
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", fmt.Errorf("failed to read response from HTTP request with method=%s, URL=%s: %w", httpMethod, URL, err)
}

return string(body), nil
}

// getK8sCredHeader retrieves the kubernetes credential information.
func getK8sCredHeader() (string, error) {
content, err := ioutil.ReadFile(k8sTokenPath)
if err != nil {
return "", fmt.Errorf("getK8sCredHeader() error: cannot read file with path %s", k8sTokenPath)
}

return "Bearer " + string(content), nil
return cm.Data, nil
}

// getClusterName retrieves the clusterName resource attribute
func getClusterName(utils detectorUtils) (string, error) {
resp, err := utils.fetchString("GET", k8sSvcURL+cwConfigmapPath)
func getClusterName(ctx context.Context, utils detectorUtils) (string, error) {
resp, err := utils.getConfigMap(ctx, cwConfigmapNS, cwConfigmapName)
if err != nil {
return "", fmt.Errorf("getClusterName() error: %w", err)
}

// parse JSON object returned from HTTP request
var respmap map[string]json.RawMessage
err = json.Unmarshal([]byte(resp), &respmap)
if err != nil {
return "", fmt.Errorf("getClusterName() error: cannot parse JSON: %w", err)
}
var d data
err = json.Unmarshal(respmap["data"], &d)
if err != nil {
return "", fmt.Errorf("getClusterName() error: cannot parse JSON: %w", err)
}

clusterName := d.ClusterName

return clusterName, nil
return resp["cluster.name"], nil
}

// getContainerID returns the containerID if currently running within a container.
Expand All @@ -226,6 +183,7 @@ func (eksUtils eksDetectorUtils) getContainerID() (string, error) {
return "", fmt.Errorf("getContainerID() error: cannot read file with path %s: %w", defaultCgroupPath, err)
}

// is this going to stop working with 1.20 when Docker is deprecated?
r, err := regexp.Compile(`^.*/docker/(.+)$`)
if err != nil {
return "", err
Expand Down
18 changes: 8 additions & 10 deletions detectors/aws/eks/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func (detectorUtils *MockDetectorUtils) fileExists(filename string) bool {
return args.Bool(0)
}

// Mock function for fetchString()
func (detectorUtils *MockDetectorUtils) fetchString(httpMethod string, URL string) (string, error) {
args := detectorUtils.Called(httpMethod, URL)
return args.String(0), args.Error(1)
// Mock function for getConfigMap()
func (detectorUtils *MockDetectorUtils) getConfigMap(_ context.Context, namespace string, name string) (map[string]string, error) {
args := detectorUtils.Called(namespace, name)
return args.Get(0).(map[string]string), args.Error(1)
}

// Mock function for getContainerID()
Expand All @@ -51,14 +51,13 @@ func (detectorUtils *MockDetectorUtils) getContainerID() (string, error) {

// Tests EKS resource detector running in EKS environment
func TestEks(t *testing.T) {

detectorUtils := new(MockDetectorUtils)

// Mock functions and set expectations
detectorUtils.On("fileExists", k8sTokenPath).Return(true)
detectorUtils.On("fileExists", k8sCertPath).Return(true)
detectorUtils.On("fetchString", "GET", k8sSvcURL+authConfigmapPath).Return("not empty", nil)
detectorUtils.On("fetchString", "GET", k8sSvcURL+cwConfigmapPath).Return(`{"data":{"cluster.name":"my-cluster"}}`, nil)
detectorUtils.On("getConfigMap", authConfigmapNS, authConfigmapName).Return(map[string]string{"not": "nil"}, nil)
detectorUtils.On("getConfigMap", cwConfigmapNS, cwConfigmapName).Return(map[string]string{"cluster.name": "my-cluster"}, nil)
detectorUtils.On("getContainerID").Return("0123456789A", nil)

// Expected resource object
Expand All @@ -69,7 +68,7 @@ func TestEks(t *testing.T) {
expectedResource := resource.NewWithAttributes(eksResourceLabels...)

// Call EKS Resource detector to detect resources
eksResourceDetector := resourceDetector{detectorUtils}
eksResourceDetector := resourceDetector{utils: detectorUtils}
resourceObj, err := eksResourceDetector.Detect(context.Background())
require.NoError(t, err)

Expand All @@ -79,15 +78,14 @@ func TestEks(t *testing.T) {

// Tests EKS resource detector not running in EKS environment
func TestNotEKS(t *testing.T) {

detectorUtils := new(MockDetectorUtils)

k8sTokenPath := "/var/run/secrets/kubernetes.io/serviceaccount/token"

// Mock functions and set expectations
detectorUtils.On("fileExists", k8sTokenPath).Return(false)

detector := resourceDetector{detectorUtils}
detector := resourceDetector{utils: detectorUtils}
r, err := detector.Detect(context.Background())
require.NoError(t, err)
assert.Equal(t, resource.Empty(), r, "Resource object should be empty")
Expand Down
2 changes: 2 additions & 0 deletions detectors/aws/eks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ require (
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
k8s.io/apimachinery v0.21.1
k8s.io/client-go v0.21.1
)
Loading