Skip to content

Commit

Permalink
Revert "Target Allocator Support for Native Prometheus Receiver (#1390)"
Browse files Browse the repository at this point in the history
This reverts commit 7a5a026.
  • Loading branch information
okankoAMZ authored Oct 15, 2024
1 parent 7a5a026 commit d8a2ba3
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 302 deletions.
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ replace (
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor => github.com/amazon-contributing/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.0.0-20241011214336-9ae2897e22a4

replace (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20240903195955-5944792b593a
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20240903195955-5944792b593a
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20240903195955-5944792b593a
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241014185443-e1d98080edaf

github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20241011214336-9ae2897e22a4
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20241011214336-9ae2897e22a4
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20241011214336-9ae2897e22a4
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241011214336-9ae2897e22a4
)

// Temporary fix, pending PR https://github.com/shirou/gopsutil/pull/957
Expand Down Expand Up @@ -363,6 +362,7 @@ require (
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/translator/pr
github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/translator/prometheus v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:21nuEQl7YYeLkVrGGvxPXkljqjR40teBCG5trGZ5LxM=
github.com/amazon-contributing/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.0.0-20241011214336-9ae2897e22a4 h1:azy7wJo4gKVc1SnmYHUM44LhMd2JUDzYWxythEfBbcg=
github.com/amazon-contributing/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:TgRVPbdaFu8pWg4H5yCqVznsRVe8wQJRfEeUpduTKT8=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20240903195955-5944792b593a h1:0nr53ssW/mHIFKARQ15u/9IAAemvRLuInEcy3HL9FTU=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20240903195955-5944792b593a/go.mod h1:tvoacZuBVf9lftcH2O6rKjumRCFAy+ycjKk3QJFaFXQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20240903195955-5944792b593a h1:sgWS0V5kO1qa85ufNET4569VpQtf1UZ2ymzRcEols9M=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20240903195955-5944792b593a/go.mod h1:igQaQJt7eA/y3dZ2VLXVql+6k/ZXBgrAa2y9FrMMIKQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20240903195955-5944792b593a h1:Dm0h8toX9zk+OmfsS9dUtOjTIFOoDiSEjyQmvfYbzoE=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20240903195955-5944792b593a/go.mod h1:NfNPpD1AYb1Z8C/0iNtViQUmI8Nzxnr3Q4RY9EreG5c=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241014185443-e1d98080edaf h1:KO3qbWjqOuP7YkEl3gk1LM50xOt+oLmonnXhwEJRbSI=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241014185443-e1d98080edaf/go.mod h1:9nHckqPolfb6nJK3q6PtAO6JwMmhWT5iJItdzE0fCho=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20241011214336-9ae2897e22a4 h1:ln9RY3VfhCgaJPKjvJZIgTSX8tiHMYnUU9N3VAeKwSw=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:tvoacZuBVf9lftcH2O6rKjumRCFAy+ycjKk3QJFaFXQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20241011214336-9ae2897e22a4 h1:+Eyu2YbpbwVH5Fy41Zvzb/gZWYgQAUwfF7kWbs984Sg=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:igQaQJt7eA/y3dZ2VLXVql+6k/ZXBgrAa2y9FrMMIKQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20241011214336-9ae2897e22a4 h1:7cJQuZGSjZJBVY+WKPRBlA6Ju3/cyVTr66LXaAZRbEA=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:NfNPpD1AYb1Z8C/0iNtViQUmI8Nzxnr3Q4RY9EreG5c=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241011214336-9ae2897e22a4 h1:Htf+QCi901ngIVNMbxg8Xpu0CYfiVIdRx7b0JETJavM=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:FMPwht01yV4UvBAWkPFO/5jAkGFtfsmz1TRaoYgWeqU=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 h1:FXrPTd8Rdlc94dKccl7KPmdmIbVh/OjelJ8/vgMRzcQ=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down Expand Up @@ -1035,6 +1035,8 @@ github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eI
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
Expand Down
21 changes: 0 additions & 21 deletions plugins/inputs/prometheus/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@
package prometheus

import (
"os"
"path/filepath"
"testing"

kitlog "github.com/go-kit/log"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -114,20 +110,3 @@ func Test_metricAppender_Commit(t *testing.T) {
}
assert.Equal(t, expected, *pmb[0])
}

func Test_loadConfigFromFile(t *testing.T) {
os.Setenv("POD_NAME", "collector-1")
configFile := filepath.Join("testdata", "target_allocator.yaml")
logger := kitlog.NewLogfmtLogger(os.Stdout)
var reloader = func(cfg *config.Config) error {
logger.Log("reloaded")
return nil
}
taManager := createTargetAllocatorManager(configFile, logger, nil, nil)
err := reloadConfig(configFile, logger, taManager, reloader)
assert.NoError(t, err)
assert.True(t, taManager.enabled)
assert.Equal(t, taManager.config.TargetAllocator.CollectorID, "collector-1")
assert.Equal(t, taManager.config.TargetAllocator.TLSSetting.CAFile, DEFAULT_TLS_CA_FILE_PATH)

}
109 changes: 16 additions & 93 deletions plugins/inputs/prometheus/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ package prometheus

import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -42,7 +39,6 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
_ "github.com/prometheus/prometheus/discovery/install"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -83,43 +79,6 @@ func init() {
prometheus.MustRegister(v.NewCollector("prometheus"))
}

// @TODO: REMOVE BEFORE RELEASE
func debugScrapeManager(logger log.Logger, scrapeManager *scrape.Manager) {
for {
for key, targets := range scrapeManager.TargetsAll() {
level.Info(logger).Log("msg", "ScraperDebug", "key", key)
for _, target := range targets {
level.Info(logger).Log("msg", "ScraperDebug-Tar", "target", target.String(), "lastScrape", target.LastScrape())
}
}
time.Sleep(5 * time.Second)
}
}

// @TODO: REMOVE BEFORE RELEASE
func debugChannelWrapper(logger log.Logger, in <-chan map[string][]*targetgroup.Group, out chan<- map[string][]*targetgroup.Group) {
for {
select {
case val, ok := <-in:
if !ok {
// Input channel is closed, stop processing
return
}
// Print the received value from the input channel
jsonVal, err := json.MarshalIndent(val, "", " ")
if err != nil {
level.Info(logger).Log("Failed to marshal: %v", err)
continue
}
//Print the received value from the input channel in JSON format
fmt.Println("Received:", string(jsonVal))
//level.Info(logger).Log("Channels Received:", string(jsonVal))

// Send the value to the output channel
out <- val
}
}
}
func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan interface{}, wg *sync.WaitGroup, mth *metricsTypeHandler) {
logLevel := &promlog.AllowedLevel{}
logLevel.Set("info")
Expand All @@ -142,6 +101,8 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
cfg.configFile = configFilePath

logger := promlog.New(&cfg.promlogConfig)
//stdlog.SetOutput(log.NewStdlibAdapter(logger))
//stdlog.Println("redirect std log")

klog.SetLogger(klogr.New().WithName("k8s_client_runtime").V(6))

Expand All @@ -156,10 +117,7 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
sdMetrics, _ = discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)
taManager = createTargetAllocatorManager(configFilePath, log.With(logger, "component", "ta manager"), scrapeManager, discoveryManagerScrape)
)

level.Info(logger).Log("msg", fmt.Sprintf("Target Allocator is %t", taManager.enabled))
mth.SetScrapeManager(scrapeManager)

var reloaders = []func(cfg *config.Config) error{
Expand Down Expand Up @@ -193,11 +151,7 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
close(reloadReady.C)
})
}
// @TODO: REMOVE BEFORE RELEASE
//scrapeIn := make(chan map[string][]*targetgroup.Group)
//go debugChannelWrapper(logger, discoveryManagerScrape.SyncCh(), scrapeIn)
//go debugScrapeManager(logger, scrapeManager)
//----

var g run.Group
{
// Termination handler.
Expand Down Expand Up @@ -225,15 +179,13 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
// Scrape discovery manager.
g.Add(
func() error {
level.Info(logger).Log("msg", "Scrape discovery manager starting")
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped", "error", err)
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...", "error", err)
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
cancelScrape()
taManager.dmLiveCh <- struct{}{}
},
)
}
Expand All @@ -249,38 +201,17 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan

level.Info(logger).Log("msg", "start discovery")
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
//err := scrapeManager.Run(scrapeIn)
level.Info(logger).Log("msg", "Scrape manager stopped", "error", err)
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...", "error", err)
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
taManager.smLiveCh <- struct{}{}
},
)
}
{
// Target Allocator manager.
if taManager.enabled {
g.Add(
func() error {

// we wait until the config is fully loaded.
level.Info(logger).Log("msg", "start ta manager")
err := taManager.Start()
level.Info(logger).Log("msg", "ta manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping ta manager...", "error", err)
taManager.Shutdown()
},
)
}
}
{
// Reload handler.

Expand All @@ -296,7 +227,7 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
for {
select {
case <-hup:
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
}

Expand Down Expand Up @@ -326,11 +257,9 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan

default:
}
if taManager.enabled {
<-taManager.taReadyCh
}

level.Info(logger).Log("msg", "handling config file")
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
}
level.Info(logger).Log("msg", "finish handling config file")
Expand Down Expand Up @@ -359,7 +288,7 @@ const (
savedScrapeNameLabel = "cwagent_saved_scrape_name" // just arbitrary name that end user won't override in relabel config
)

func reloadConfig(filename string, logger log.Logger, taManager *TargetAllocatorManager, rls ...func(*config.Config) error) (err error) {
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
content, _ := os.ReadFile(filename)
text := string(content)
Expand All @@ -373,18 +302,12 @@ func reloadConfig(filename string, logger log.Logger, taManager *TargetAllocator
configSuccess.Set(0)
}
}()
// Check for TA
var conf *config.Config
if taManager.enabled {
level.Info(logger).Log("msg", "Target Allocator is enabled")
conf = (*config.Config)(taManager.config.PrometheusConfig)
} else {
conf, err = config.LoadFile(filename, false, false, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
}

conf, err := config.LoadFile(filename, false, false, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
}
level.Debug(logger).Log("config", conf)

// For saving name before relabel
// - __name__ https://github.com/aws/amazon-cloudwatch-agent/issues/190
// - job and instance https://github.com/aws/amazon-cloudwatch-agent/issues/193
Expand Down
Loading

0 comments on commit d8a2ba3

Please sign in to comment.