Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Target Allocator Support for Native Prometheus Receiver #1390

Merged
merged 8 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ replace (
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor => github.com/amazon-contributing/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.0.0-20241011214336-9ae2897e22a4

replace (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20241011214336-9ae2897e22a4
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20241011214336-9ae2897e22a4
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20241011214336-9ae2897e22a4
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241011214336-9ae2897e22a4
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20240903195955-5944792b593a
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20240903195955-5944792b593a
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20240903195955-5944792b593a
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241014185443-e1d98080edaf

)

// Temporary fix, pending PR https://github.com/shirou/gopsutil/pull/957
Expand Down Expand Up @@ -362,7 +363,6 @@ require (
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
18 changes: 8 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/translator/pr
github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/translator/prometheus v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:21nuEQl7YYeLkVrGGvxPXkljqjR40teBCG5trGZ5LxM=
github.com/amazon-contributing/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.0.0-20241011214336-9ae2897e22a4 h1:azy7wJo4gKVc1SnmYHUM44LhMd2JUDzYWxythEfBbcg=
github.com/amazon-contributing/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:TgRVPbdaFu8pWg4H5yCqVznsRVe8wQJRfEeUpduTKT8=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20241011214336-9ae2897e22a4 h1:ln9RY3VfhCgaJPKjvJZIgTSX8tiHMYnUU9N3VAeKwSw=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:tvoacZuBVf9lftcH2O6rKjumRCFAy+ycjKk3QJFaFXQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20241011214336-9ae2897e22a4 h1:+Eyu2YbpbwVH5Fy41Zvzb/gZWYgQAUwfF7kWbs984Sg=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:igQaQJt7eA/y3dZ2VLXVql+6k/ZXBgrAa2y9FrMMIKQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20241011214336-9ae2897e22a4 h1:7cJQuZGSjZJBVY+WKPRBlA6Ju3/cyVTr66LXaAZRbEA=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:NfNPpD1AYb1Z8C/0iNtViQUmI8Nzxnr3Q4RY9EreG5c=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241011214336-9ae2897e22a4 h1:Htf+QCi901ngIVNMbxg8Xpu0CYfiVIdRx7b0JETJavM=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241011214336-9ae2897e22a4/go.mod h1:FMPwht01yV4UvBAWkPFO/5jAkGFtfsmz1TRaoYgWeqU=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20240903195955-5944792b593a h1:0nr53ssW/mHIFKARQ15u/9IAAemvRLuInEcy3HL9FTU=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20240903195955-5944792b593a/go.mod h1:tvoacZuBVf9lftcH2O6rKjumRCFAy+ycjKk3QJFaFXQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20240903195955-5944792b593a h1:sgWS0V5kO1qa85ufNET4569VpQtf1UZ2ymzRcEols9M=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20240903195955-5944792b593a/go.mod h1:igQaQJt7eA/y3dZ2VLXVql+6k/ZXBgrAa2y9FrMMIKQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20240903195955-5944792b593a h1:Dm0h8toX9zk+OmfsS9dUtOjTIFOoDiSEjyQmvfYbzoE=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20240903195955-5944792b593a/go.mod h1:NfNPpD1AYb1Z8C/0iNtViQUmI8Nzxnr3Q4RY9EreG5c=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241014185443-e1d98080edaf h1:KO3qbWjqOuP7YkEl3gk1LM50xOt+oLmonnXhwEJRbSI=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241014185443-e1d98080edaf/go.mod h1:9nHckqPolfb6nJK3q6PtAO6JwMmhWT5iJItdzE0fCho=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 h1:FXrPTd8Rdlc94dKccl7KPmdmIbVh/OjelJ8/vgMRzcQ=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down Expand Up @@ -1035,8 +1035,6 @@ github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eI
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
Expand Down
21 changes: 21 additions & 0 deletions plugins/inputs/prometheus/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
package prometheus

import (
"os"
"path/filepath"
"testing"

kitlog "github.com/go-kit/log"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -110,3 +114,20 @@ func Test_metricAppender_Commit(t *testing.T) {
}
assert.Equal(t, expected, *pmb[0])
}

func Test_loadConfigFromFile(t *testing.T) {
os.Setenv("POD_NAME", "collector-1")
configFile := filepath.Join("testdata", "target_allocator.yaml")
logger := kitlog.NewLogfmtLogger(os.Stdout)
var reloader = func(cfg *config.Config) error {
logger.Log("reloaded")
return nil
}
taManager := createTargetAllocatorManager(configFile, logger, nil, nil)
err := reloadConfig(configFile, logger, taManager, reloader)
assert.NoError(t, err)
assert.True(t, taManager.enabled)
assert.Equal(t, taManager.config.TargetAllocator.CollectorID, "collector-1")
assert.Equal(t, taManager.config.TargetAllocator.TLSSetting.CAFile, DEFAULT_TLS_CA_FILE_PATH)

}
109 changes: 93 additions & 16 deletions plugins/inputs/prometheus/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ package prometheus

import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -39,6 +42,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
_ "github.com/prometheus/prometheus/discovery/install"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -79,6 +83,43 @@ func init() {
prometheus.MustRegister(v.NewCollector("prometheus"))
}

// @TODO: REMOVE BEFORE RELEASE
func debugScrapeManager(logger log.Logger, scrapeManager *scrape.Manager) {
for {
for key, targets := range scrapeManager.TargetsAll() {
level.Info(logger).Log("msg", "ScraperDebug", "key", key)
for _, target := range targets {
level.Info(logger).Log("msg", "ScraperDebug-Tar", "target", target.String(), "lastScrape", target.LastScrape())
}
}
time.Sleep(5 * time.Second)
}
}

// @TODO: REMOVE BEFORE RELEASE
func debugChannelWrapper(logger log.Logger, in <-chan map[string][]*targetgroup.Group, out chan<- map[string][]*targetgroup.Group) {
for {
select {
case val, ok := <-in:
if !ok {
// Input channel is closed, stop processing
return
}
// Print the received value from the input channel
jsonVal, err := json.MarshalIndent(val, "", " ")
if err != nil {
level.Info(logger).Log("Failed to marshal: %v", err)
continue
}
//Print the received value from the input channel in JSON format
fmt.Println("Received:", string(jsonVal))
//level.Info(logger).Log("Channels Received:", string(jsonVal))

// Send the value to the output channel
out <- val
}
}
}
func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan interface{}, wg *sync.WaitGroup, mth *metricsTypeHandler) {
logLevel := &promlog.AllowedLevel{}
logLevel.Set("info")
Expand All @@ -101,8 +142,6 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
cfg.configFile = configFilePath

logger := promlog.New(&cfg.promlogConfig)
//stdlog.SetOutput(log.NewStdlibAdapter(logger))
//stdlog.Println("redirect std log")

klog.SetLogger(klogr.New().WithName("k8s_client_runtime").V(6))

Expand All @@ -117,7 +156,10 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
sdMetrics, _ = discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)
taManager = createTargetAllocatorManager(configFilePath, log.With(logger, "component", "ta manager"), scrapeManager, discoveryManagerScrape)
)

level.Info(logger).Log("msg", fmt.Sprintf("Target Allocator is %t", taManager.enabled))
mth.SetScrapeManager(scrapeManager)

var reloaders = []func(cfg *config.Config) error{
Expand Down Expand Up @@ -151,7 +193,11 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
close(reloadReady.C)
})
}

// @TODO: REMOVE BEFORE RELEASE
//scrapeIn := make(chan map[string][]*targetgroup.Group)
//go debugChannelWrapper(logger, discoveryManagerScrape.SyncCh(), scrapeIn)
//go debugScrapeManager(logger, scrapeManager)
//----
var g run.Group
{
// Termination handler.
Expand Down Expand Up @@ -179,13 +225,15 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
// Scrape discovery manager.
g.Add(
func() error {
level.Info(logger).Log("msg", "Scrape discovery manager starting")
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
level.Info(logger).Log("msg", "Scrape discovery manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
level.Info(logger).Log("msg", "Stopping scrape discovery manager...", "error", err)
cancelScrape()
taManager.dmLiveCh <- struct{}{}
},
)
}
Expand All @@ -201,17 +249,38 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan

level.Info(logger).Log("msg", "start discovery")
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
//err := scrapeManager.Run(scrapeIn)
level.Info(logger).Log("msg", "Scrape manager stopped", "error", err)
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
level.Info(logger).Log("msg", "Stopping scrape manager...", "error", err)
scrapeManager.Stop()
taManager.smLiveCh <- struct{}{}
},
)
}
{
// Target Allocator manager.
if taManager.enabled {
g.Add(
func() error {

// we wait until the config is fully loaded.
level.Info(logger).Log("msg", "start ta manager")
err := taManager.Start()
level.Info(logger).Log("msg", "ta manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping ta manager...", "error", err)
taManager.Shutdown()
},
)
}
}
{
// Reload handler.

Expand All @@ -227,7 +296,7 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
for {
select {
case <-hup:
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
}

Expand Down Expand Up @@ -257,9 +326,11 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan

default:
}

if taManager.enabled {
<-taManager.taReadyCh
}
level.Info(logger).Log("msg", "handling config file")
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
}
level.Info(logger).Log("msg", "finish handling config file")
Expand Down Expand Up @@ -288,7 +359,7 @@ const (
savedScrapeNameLabel = "cwagent_saved_scrape_name" // just arbitrary name that end user won't override in relabel config
)

func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
func reloadConfig(filename string, logger log.Logger, taManager *TargetAllocatorManager, rls ...func(*config.Config) error) (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
content, _ := os.ReadFile(filename)
text := string(content)
Expand All @@ -302,12 +373,18 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
configSuccess.Set(0)
}
}()

conf, err := config.LoadFile(filename, false, false, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
// Check for TA
var conf *config.Config
if taManager.enabled {
level.Info(logger).Log("msg", "Target Allocator is enabled")
conf = (*config.Config)(taManager.config.PrometheusConfig)
} else {
conf, err = config.LoadFile(filename, false, false, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
}
}

level.Debug(logger).Log("config", conf)
// For saving name before relabel
// - __name__ https://github.com/aws/amazon-cloudwatch-agent/issues/190
// - job and instance https://github.com/aws/amazon-cloudwatch-agent/issues/193
Expand Down
Loading