Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Target Allocator Support for Telegraf Based Prometheus Receiver #1394

Merged
merged 29 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9954a07
Basic Working Copy
okankoAMZ Oct 4, 2024
e8e509d
clean-up
okankoAMZ Oct 14, 2024
b67d022
labelled temporary function
okankoAMZ Oct 14, 2024
79e030f
added unit test and ran go fmt
okankoAMZ Oct 14, 2024
db10d75
fixed blocking channel issue with reloadConfig
okankoAMZ Oct 14, 2024
8275956
Merge branch 'target-allocator' into ta-collector
okankoAMZ Oct 14, 2024
bf0c65a
ran go mod tidy, go fmt, go lint
okankoAMZ Oct 14, 2024
1ad4aae
addressed comments
okankoAMZ Oct 14, 2024
7a5a026
Target Allocator Support for Native Prometheus Receiver (#1390)
okankoAMZ Oct 14, 2024
43bbf95
Revert "Target Allocator Support for Native Prometheus Receiver" (#1393)
okankoAMZ Oct 15, 2024
93a2ae0
fixed nil typo
okankoAMZ Oct 15, 2024
90bfd8f
Merge branch 'target-allocator' into ta-collector
okankoAMZ Oct 15, 2024
4550d56
changed error to warning for TA config load
okankoAMZ Oct 16, 2024
aa6a7b2
addressed comments
okankoAMZ Oct 16, 2024
d15be9c
addressed comments
okankoAMZ Oct 17, 2024
7692754
added reload config thread
okankoAMZ Oct 22, 2024
079b599
removed debugger functions
okankoAMZ Oct 22, 2024
d348f10
added extra unittests
okankoAMZ Oct 23, 2024
35b2594
changed to using filepath.join
okankoAMZ Oct 23, 2024
1cfff60
go fmt
okankoAMZ Oct 23, 2024
e0ccd20
addressed comments and added reload interval
okankoAMZ Oct 25, 2024
f200e4d
Changed 2 channel shutdown to 1 and added log level
okankoAMZ Oct 30, 2024
e3bb6cf
go mod update
okankoAMZ Nov 4, 2024
205603b
lint
okankoAMZ Nov 4, 2024
afca216
Merge branch 'main' into ta-collector
okankoAMZ Nov 11, 2024
c3a0db2
changed fmt to logger
okankoAMZ Nov 12, 2024
8b7824d
Merge branch 'main' into ta-collector
okankoAMZ Nov 13, 2024
fb0ea4b
fixed unit-tests
okankoAMZ Nov 13, 2024
b370d59
Merge branch 'main' into ta-collector
okankoAMZ Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions plugins/inputs/prometheus/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
package prometheus

import (
"os"
"path/filepath"
"testing"

kitlog "github.com/go-kit/log"
"github.com/prometheus/common/promlog"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -110,3 +115,57 @@ func Test_metricAppender_Commit(t *testing.T) {
}
assert.Equal(t, expected, *pmb[0])
}

func Test_loadConfigFromFileWithTargetAllocator(t *testing.T) {
os.Setenv("POD_NAME", "collector-1")
okankoAMZ marked this conversation as resolved.
Show resolved Hide resolved
defer os.Unsetenv("POD_NAME")
configFile := filepath.Join("testdata", "target_allocator.yaml")
logger := kitlog.NewLogfmtLogger(os.Stdout)
logLevel := promlog.AllowedLevel{}
logLevel.Set("DEBUG")
var reloadHandler = func(cfg *config.Config) error {
logger.Log("reloaded")
return nil
}
taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil)
err := reloadConfig(configFile, logger, taManager, reloadHandler)
assert.NoError(t, err)
assert.True(t, taManager.enabled)
assert.Equal(t, taManager.config.TargetAllocator.CollectorID, "collector-1")
assert.Equal(t, taManager.config.TargetAllocator.TLSSetting.CAFile, DEFAULT_TLS_CA_FILE_PATH)

}
okankoAMZ marked this conversation as resolved.
Show resolved Hide resolved

func Test_loadConfigFromFileWithoutTargetAllocator(t *testing.T) {
os.Setenv("POD_NAME", "collector-1")
defer os.Unsetenv("POD_NAME")
configFile := filepath.Join("testdata", "base-k8.yaml")
logLevel := promlog.AllowedLevel{}
logLevel.Set("DEBUG")
logger := kitlog.NewLogfmtLogger(os.Stdout)
var reloadHandler = func(cfg *config.Config) error {
logger.Log("reloaded")
return nil
}
taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil)
err := reloadConfig(configFile, logger, taManager, reloadHandler)
assert.NoError(t, err)
assert.False(t, taManager.enabled)

}
func Test_loadConfigFromFileEC2(t *testing.T) {
configFile := filepath.Join("testdata", "base-k8.yaml")
logger := kitlog.NewLogfmtLogger(os.Stdout)
logLevel := promlog.AllowedLevel{}
logLevel.Set("DEBUG")
var reloadHandler = func(cfg *config.Config) error {
logger.Log("reloaded")
return nil
}

taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil)
err := reloadConfig(configFile, logger, taManager, reloadHandler)
assert.NoError(t, err)
assert.False(t, taManager.enabled)

}
104 changes: 70 additions & 34 deletions plugins/inputs/prometheus/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package prometheus

import (
"context"
"fmt"
"os"
"os/signal"
"runtime"
Expand Down Expand Up @@ -101,8 +102,6 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
cfg.configFile = configFilePath

logger := promlog.New(&cfg.promlogConfig)
//stdlog.SetOutput(log.NewStdlibAdapter(logger))
okankoAMZ marked this conversation as resolved.
Show resolved Hide resolved
//stdlog.Println("redirect std log")

klog.SetLogger(klogr.New().WithName("k8s_client_runtime").V(6))

Expand All @@ -116,8 +115,19 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
ctxScrape, cancelScrape = context.WithCancel(context.Background())
sdMetrics, _ = discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)

scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)
taManager = createTargetAllocatorManager(configFilePath, log.With(logger, "component", "ta manager"), logLevel, scrapeManager, discoveryManagerScrape)
)

level.Info(logger).Log("msg", fmt.Sprintf("Target Allocator is %t", taManager.enabled))
okankoAMZ marked this conversation as resolved.
Show resolved Hide resolved
okankoAMZ marked this conversation as resolved.
Show resolved Hide resolved
//Setup Target Allocator Scrape Post Process Handler
taManager.AttachReloadConfigHandler(
func(prometheusConfig *config.Config) {
relabelScrapeConfigs(prometheusConfig, logger)
},
)

mth.SetScrapeManager(scrapeManager)

var reloaders = []func(cfg *config.Config) error{
Expand Down Expand Up @@ -151,7 +161,6 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
close(reloadReady.C)
})
}

var g run.Group
{
// Termination handler.
Expand Down Expand Up @@ -179,12 +188,13 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
// Scrape discovery manager.
g.Add(
func() error {
level.Info(logger).Log("msg", "Scrape discovery manager starting")
okankoAMZ marked this conversation as resolved.
Show resolved Hide resolved
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
level.Info(logger).Log("msg", "Scrape discovery manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
level.Info(logger).Log("msg", "Stopping scrape discovery manager...", "error", err)
cancelScrape()
},
)
Expand All @@ -201,17 +211,35 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan

level.Info(logger).Log("msg", "start discovery")
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
level.Info(logger).Log("msg", "Scrape manager stopped", "error", err)
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
level.Info(logger).Log("msg", "Stopping scrape manager...", "error", err)
scrapeManager.Stop()
},
)
}
{
// Target Allocator manager.
if taManager.enabled {
g.Add(
func() error {
// we wait until the config is fully loaded.
level.Info(logger).Log("msg", "start ta manager")
err := taManager.Run()
level.Info(logger).Log("msg", "ta manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping ta manager...", "error", err)
taManager.Shutdown()
},
)
}
}
{
// Reload handler.

Expand All @@ -227,7 +255,7 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
for {
select {
case <-hup:
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
}

Expand Down Expand Up @@ -257,9 +285,11 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan

default:
}

if taManager.enabled {
<-taManager.taReadyCh
}
level.Info(logger).Log("msg", "handling config file")
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
}
level.Info(logger).Log("msg", "finish handling config file")
Expand Down Expand Up @@ -288,30 +318,11 @@ const (
savedScrapeNameLabel = "cwagent_saved_scrape_name" // just arbitrary name that end user won't override in relabel config
)

func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
content, _ := os.ReadFile(filename)
text := string(content)
level.Debug(logger).Log("msg", "Prometheus configuration file", "value", text)

defer func() {
if err == nil {
configSuccess.Set(1)
configSuccessTime.SetToCurrentTime()
} else {
configSuccess.Set(0)
}
}()

conf, err := config.LoadFile(filename, false, false, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
}

func relabelScrapeConfigs(prometheusConfig *config.Config, logger log.Logger) {
// For saving name before relabel
// - __name__ https://github.com/aws/amazon-cloudwatch-agent/issues/190
// - job and instance https://github.com/aws/amazon-cloudwatch-agent/issues/193
for _, sc := range conf.ScrapeConfigs {
for _, sc := range prometheusConfig.ScrapeConfigs {
relabelConfigs := []*relabel.Config{
// job
{
Expand All @@ -331,13 +342,38 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
},
}

level.Info(logger).Log("msg", "Add extra relabel_configs and metric_relabel_configs to save job, instance and __name__ before user relabel")
level.Debug(logger).Log("msg", "Add extra relabel_configs and metric_relabel_configs to save job, instance and __name__ before user relabel")

sc.RelabelConfigs = append(relabelConfigs, sc.RelabelConfigs...)
sc.MetricRelabelConfigs = append(metricNameRelabelConfigs, sc.MetricRelabelConfigs...)

}
}
func reloadConfig(filename string, logger log.Logger, taManager *TargetAllocatorManager, rls ...func(*config.Config) error) (err error) {
okankoAMZ marked this conversation as resolved.
Show resolved Hide resolved
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
content, _ := os.ReadFile(filename)
text := string(content)
level.Debug(logger).Log("msg", "Prometheus configuration file", "value", text)

defer func() {
if err == nil {
configSuccess.Set(1)
configSuccessTime.SetToCurrentTime()
} else {
configSuccess.Set(0)
}
}()
// Check for TA
var conf *config.Config
if taManager.enabled {
level.Info(logger).Log("msg", "Target Allocator is enabled")
conf = (*config.Config)(taManager.config.PrometheusConfig)
} else {
conf, err = config.LoadFile(filename, false, false, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
}
}
relabelScrapeConfigs(conf, logger)
failed := false
for _, rl := range rls {
if err := rl(conf); err != nil {
Expand Down
Loading
Loading