Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AppSignals] Implement translation for appsignals on native k8s #1012

Merged
merged 7 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/processors/awsappsignals/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (cfg *Config) Validate() error {
if resolver.Name == "" {
return errors.New("name must not be empty for eks resolver")
}
case PlatformK8s:
if resolver.Name == "" {
return errors.New("name must not be empty for k8s resolver")
}
case PlatformGeneric:
default:
return errors.New("unknown resolver")
Expand Down
12 changes: 12 additions & 0 deletions plugins/processors/awsappsignals/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ func TestValidatePassed(t *testing.T) {
Rules: nil,
}
assert.Nil(t, config.Validate())

config = Config{
Resolvers: []Resolver{NewK8sResolver("test"), NewGenericResolver("")},
Rules: nil,
}
assert.Nil(t, config.Validate())
}

func TestValidateFailedOnEmptyResolver(t *testing.T) {
Expand All @@ -31,4 +37,10 @@ func TestValidateFailedOnEmptyClusterName(t *testing.T) {
Rules: nil,
}
assert.NotNil(t, config.Validate())

config = Config{
Resolvers: []Resolver{NewK8sResolver("")},
Rules: nil,
}
assert.NotNil(t, config.Validate())
}
9 changes: 9 additions & 0 deletions plugins/processors/awsappsignals/config/resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const (
PlatformGeneric = "generic"
// PlatformEKS Amazon EKS platform
PlatformEKS = "eks"
// PlatformK8s Native Kubernetes
PlatformK8s = "k8s"
)

type Resolver struct {
Expand All @@ -22,6 +24,13 @@ func NewEKSResolver(name string) Resolver {
}
}

func NewK8sResolver(name string) Resolver {
return Resolver{
Name: name,
Platform: PlatformK8s,
}
}

func NewGenericResolver(name string) Resolver {
return Resolver{
Name: name,
Expand Down
5 changes: 5 additions & 0 deletions plugins/processors/awsappsignals/config/resolvers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ func TestNewEKSResolver(t *testing.T) {
assert.Equal(t, "eks", resolver.Platform)
}

func TestK8sResolver(t *testing.T) {
resolver := NewK8sResolver("test")
assert.Equal(t, "k8s", resolver.Platform)
}

func TestNewGenericResolver(t *testing.T) {
resolver := NewGenericResolver("")
assert.Equal(t, "generic", resolver.Platform)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const (
K8SRemoteNamespace = "K8s.RemoteNamespace"

// hosted in attribute names
HostedInClusterName = "HostedIn.EKS.Cluster"
HostedInK8SNamespace = "HostedIn.K8s.Namespace"
HostedInEnvironment = "HostedIn.Environment"
HostedInClusterNameEKS = "HostedIn.EKS.Cluster"
HostedInClusterNameK8s = "HostedIn.K8s.Cluster"
HostedInK8SNamespace = "HostedIn.K8s.Namespace"
HostedInEnvironment = "HostedIn.Environment"
)
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ type attributesResolver struct {

// create a new attributes resolver
func NewAttributesResolver(resolvers []appsignalsconfig.Resolver, logger *zap.Logger) *attributesResolver {
//TODO: Logic for native k8s needs to be implemented
subResolvers := []subResolver{}
for _, resolver := range resolvers {
if resolver.Platform == appsignalsconfig.PlatformEKS {
subResolvers = append(subResolvers, getEksResolver(logger), newEKSHostedInAttributeResolver(resolver.Name))
if resolver.Platform == appsignalsconfig.PlatformEKS || resolver.Platform == appsignalsconfig.PlatformK8s {
subResolvers = append(subResolvers, getKubernetesResolver(logger), newKubernetesHostedInAttributeResolver(resolver.Name))
} else {
subResolvers = append(subResolvers, newHostedInAttributeResolver(resolver.Name, DefaultHostedInAttributes))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/client-go/tools/clientcmd"

attr "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsappsignals/internal/attributes"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
)

const (
Expand Down Expand Up @@ -67,7 +68,7 @@ var (
replicaSetFromPodPattern = regexp.MustCompile(podWithReplicaSetNamePattern)
)

type eksResolver struct {
type kubernetesResolver struct {
logger *zap.Logger
clientset kubernetes.Interface
ipToPod *sync.Map
Expand All @@ -77,7 +78,7 @@ type eksResolver struct {
workloadAndNamespaceToLabels *sync.Map
serviceToWorkload *sync.Map // computed from serviceAndNamespaceToSelectors and workloadAndNamespaceToLabels every 1 min
workloadPodCount map[string]int
safeStopCh *safeChannel // trace and metric processors share the same eksResolver and might close the same channel separately
safeStopCh *safeChannel // trace and metric processors share the same kubernetesResolver and might close the same channel separately
}

// a safe channel which can be closed multiple times
Expand All @@ -100,7 +101,7 @@ func (sc *safeChannel) Close() {

var (
once sync.Once
instance *eksResolver
instance *kubernetesResolver
)

func jitterSleep(seconds int) {
Expand Down Expand Up @@ -508,7 +509,7 @@ func (m *ServiceToWorkloadMapper) Start(stopCh chan struct{}) {
}()
}

func getEksResolver(logger *zap.Logger) subResolver {
func getKubernetesResolver(logger *zap.Logger) subResolver {
once.Do(func() {
config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
Expand All @@ -517,7 +518,7 @@ func getEksResolver(logger *zap.Logger) subResolver {

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
logger.Fatal("Failed to create eks client", zap.Error(err))
logger.Fatal("Failed to create kubernetes client", zap.Error(err))
}

// jitter calls to the kubernetes api
Expand All @@ -543,7 +544,7 @@ func getEksResolver(logger *zap.Logger) subResolver {
serviceToWorkloadMapper := NewServiceToWorkloadMapper(serviceWatcher.serviceAndNamespaceToSelectors, podWatcher.workloadAndNamespaceToLabels, serviceToWorkload, logger, timedDeleter)
serviceToWorkloadMapper.Start(safeStopCh.ch)

instance = &eksResolver{
instance = &kubernetesResolver{
logger: logger,
clientset: clientset,
ipToServiceAndNamespace: serviceWatcher.ipToServiceAndNamespace,
Expand All @@ -560,13 +561,13 @@ func getEksResolver(logger *zap.Logger) subResolver {
return instance
}

func (e *eksResolver) Stop(_ context.Context) error {
func (e *kubernetesResolver) Stop(_ context.Context) error {
e.safeStopCh.Close()
return nil
}

// add a method to eksResolver
func (e *eksResolver) GetWorkloadAndNamespaceByIP(ip string) (string, string, error) {
// add a method to kubernetesResolver
func (e *kubernetesResolver) GetWorkloadAndNamespaceByIP(ip string) (string, string, error) {
var workload, namespace string
if podKey, ok := e.ipToPod.Load(ip); ok {
pod := podKey.(string)
Expand All @@ -584,10 +585,10 @@ func (e *eksResolver) GetWorkloadAndNamespaceByIP(ip string) (string, string, er
}
}

return "", "", errors.New("no EKS workload found for ip: " + ip)
return "", "", errors.New("no kubernetes workload found for ip: " + ip)
}

func (e *eksResolver) Process(attributes, resourceAttributes pcommon.Map) error {
func (e *kubernetesResolver) Process(attributes, resourceAttributes pcommon.Map) error {
if value, ok := attributes.Get(attr.AWSRemoteService); ok {
valueStr := value.AsString()
ipStr := ""
Expand Down Expand Up @@ -658,30 +659,35 @@ func getHostNetworkPorts(pod *corev1.Pod) []string {
return ports
}

type eksHostedInAttributeResolver struct {
type kubernetesHostedInAttributeResolver struct {
clusterName string
attributeMap map[string]string
}

func newEKSHostedInAttributeResolver(clusterName string) *eksHostedInAttributeResolver {
return &eksHostedInAttributeResolver{
func newKubernetesHostedInAttributeResolver(clusterName string) *kubernetesHostedInAttributeResolver {
return &kubernetesHostedInAttributeResolver{
clusterName: clusterName,
attributeMap: map[string]string{
semconv.AttributeK8SNamespaceName: attr.HostedInK8SNamespace,
},
}
}
func (h *eksHostedInAttributeResolver) Process(attributes, resourceAttributes pcommon.Map) error {
func (h *kubernetesHostedInAttributeResolver) Process(attributes, resourceAttributes pcommon.Map) error {
for attrKey, mappingKey := range h.attributeMap {
if val, ok := resourceAttributes.Get(attrKey); ok {
attributes.PutStr(mappingKey, val.AsString())
}
}

attributes.PutStr(attr.HostedInClusterName, h.clusterName)
if isEks, _ := common.IsEKS(); isEks {
attributes.PutStr(attr.HostedInClusterNameEKS, h.clusterName)
} else {
attributes.PutStr(attr.HostedInClusterNameK8s, h.clusterName)
}

return nil
}

func (h *eksHostedInAttributeResolver) Stop(ctx context.Context) error {
func (h *kubernetesHostedInAttributeResolver) Stop(ctx context.Context) error {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

attr "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsappsignals/internal/attributes"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
)

// MockDeleter deletes a key immediately, useful for testing.
Expand Down Expand Up @@ -693,7 +694,7 @@ func TestEksResolver(t *testing.T) {
ctx := context.Background()

t.Run("Test GetWorkloadAndNamespaceByIP", func(t *testing.T) {
resolver := &eksResolver{
resolver := &kubernetesResolver{
logger: logger,
ipToPod: &sync.Map{},
podToWorkloadAndNamespace: &sync.Map{},
Expand All @@ -717,7 +718,7 @@ func TestEksResolver(t *testing.T) {

// Test non-existing IP
_, _, err = resolver.GetWorkloadAndNamespaceByIP("5.6.7.8")
if err == nil || !strings.Contains(err.Error(), "no EKS workload found for ip: 5.6.7.8") {
if err == nil || !strings.Contains(err.Error(), "no kubernetes workload found for ip: 5.6.7.8") {
t.Errorf("Expected error, got %v", err)
}

Expand All @@ -733,7 +734,7 @@ func TestEksResolver(t *testing.T) {
})

t.Run("Test Stop", func(t *testing.T) {
resolver := &eksResolver{
resolver := &kubernetesResolver{
logger: logger,
safeStopCh: &safeChannel{ch: make(chan struct{}), closed: false},
}
Expand Down Expand Up @@ -766,7 +767,7 @@ func TestEksResolver(t *testing.T) {
}

logger, _ := zap.NewProduction()
resolver := &eksResolver{
resolver := &kubernetesResolver{
logger: logger,
ipToPod: &sync.Map{},
podToWorkloadAndNamespace: &sync.Map{},
Expand Down Expand Up @@ -815,6 +816,7 @@ func TestEksResolver(t *testing.T) {
}

func TestHostedInEksResolver(t *testing.T) {
common.NewDetector = common.TestEKSDetector
// helper function to get string values from the attributes
getStrAttr := func(attributes pcommon.Map, key string, t *testing.T) string {
if value, ok := attributes.Get(key); ok {
Expand All @@ -825,7 +827,7 @@ func TestHostedInEksResolver(t *testing.T) {
}
}

resolver := newEKSHostedInAttributeResolver("test-cluster")
resolver := newKubernetesHostedInAttributeResolver("test-cluster")

// Test case 1 and 2: resourceAttributes contains "k8s.namespace.name" and EKS cluster name
attributes := pcommon.NewMap()
Expand All @@ -835,7 +837,7 @@ func TestHostedInEksResolver(t *testing.T) {
err := resolver.Process(attributes, resourceAttributes)
assert.NoError(t, err)
assert.Equal(t, "test-namespace-3", getStrAttr(attributes, attr.HostedInK8SNamespace, t))
assert.Equal(t, "test-cluster", getStrAttr(attributes, attr.HostedInClusterName, t))
assert.Equal(t, "test-cluster", getStrAttr(attributes, attr.HostedInClusterNameEKS, t))
}

func TestExtractIPPort(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[agent]
collection_jitter = "0s"
debug = false
flush_interval = "1s"
flush_jitter = "0s"
hostname = "host_name_from_env"
interval = "60s"
logfile = ""
logtarget = "lumberjack"
metric_batch_size = 1000
metric_buffer_limit = 10000
omit_hostname = false
precision = ""
quiet = false
round_interval = false

[inputs]

[outputs]

[[outputs.cloudwatchlogs]]
endpoint_override = "https://fake_endpoint"
force_flush_interval = "5s"
log_stream_name = "host_name_from_env"
region = "us-east-1"

[processors]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"agent": {
"region": "us-east-1"
},
"logs": {
"metrics_collected": {
"app_signals": {
"hosted_in": "TestCluster"
},
"kubernetes": {
"cluster_name": "TestCluster",
"metrics_collection_interval": 30,
"disable_metric_extraction": true,
"enhanced_container_insights": false
}
},
"force_flush_interval": 5,
"endpoint_override":"https://fake_endpoint"
},
"traces": {
"traces_collected": {
"app_signals": {}
}
}
}
Loading