Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Target Allocator Support for Native Prometheus Receiver #1390

Merged
merged 8 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ replace (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20240903195955-5944792b593a
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20240903195955-5944792b593a
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20240903195955-5944792b593a
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20240903195955-5944792b593a
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241014185443-e1d98080edaf
)

// Temporary fix, pending PR https://github.com/shirou/gopsutil/pull/957
Expand Down Expand Up @@ -171,6 +171,8 @@ require (
k8s.io/klog/v2 v2.120.1
)

require github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.103.0

require (
cloud.google.com/go v0.112.1 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
Expand Down Expand Up @@ -313,7 +315,6 @@ require (
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down Expand Up @@ -341,7 +342,6 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.103.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayr
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20240903195955-5944792b593a/go.mod h1:igQaQJt7eA/y3dZ2VLXVql+6k/ZXBgrAa2y9FrMMIKQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20240903195955-5944792b593a h1:Dm0h8toX9zk+OmfsS9dUtOjTIFOoDiSEjyQmvfYbzoE=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20240903195955-5944792b593a/go.mod h1:NfNPpD1AYb1Z8C/0iNtViQUmI8Nzxnr3Q4RY9EreG5c=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20240903195955-5944792b593a h1:+ptQohy80eo09rVCEJA6RIeXCsykZMkRJz82XIbSFzI=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20240903195955-5944792b593a/go.mod h1:FMPwht01yV4UvBAWkPFO/5jAkGFtfsmz1TRaoYgWeqU=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241014185443-e1d98080edaf h1:KO3qbWjqOuP7YkEl3gk1LM50xOt+oLmonnXhwEJRbSI=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241014185443-e1d98080edaf/go.mod h1:9nHckqPolfb6nJK3q6PtAO6JwMmhWT5iJItdzE0fCho=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 h1:FXrPTd8Rdlc94dKccl7KPmdmIbVh/OjelJ8/vgMRzcQ=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down Expand Up @@ -1009,8 +1009,6 @@ github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eI
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
Expand Down
21 changes: 21 additions & 0 deletions plugins/inputs/prometheus/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
package prometheus

import (
"os"
"path/filepath"
"testing"

kitlog "github.com/go-kit/log"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -110,3 +114,20 @@ func Test_metricAppender_Commit(t *testing.T) {
}
assert.Equal(t, expected, *pmb[0])
}

func Test_loadConfigFromFile(t *testing.T) {
os.Setenv("POD_NAME", "collector-1")
configFile := filepath.Join("testdata", "target_allocator.yaml")
logger := kitlog.NewLogfmtLogger(os.Stdout)
var reloader = func(cfg *config.Config) error {
logger.Log("reloaded")
return nil
}
taManager := createTargetAllocatorManager(configFile, logger, nil, nil)
err := reloadConfig(configFile, logger, taManager, reloader)
assert.NoError(t, err)
assert.True(t, taManager.enabled)
assert.Equal(t, taManager.config.TargetAllocator.CollectorID, "collector-1")
assert.Equal(t, taManager.config.TargetAllocator.TLSSetting.CAFile, DEFAULT_TLS_CA_FILE_PATH)

}
107 changes: 94 additions & 13 deletions plugins/inputs/prometheus/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@ package prometheus

import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"

"github.com/prometheus/prometheus/discovery/targetgroup"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/run"
tamanager "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
v "github.com/prometheus/client_golang/prometheus/collectors/version"
Expand Down Expand Up @@ -79,6 +85,43 @@ func init() {
prometheus.MustRegister(v.NewCollector("prometheus"))
}

// @TODO: REMOVE BEFORE RELEASE
func debugScrapeManager(logger log.Logger, scrapeManager *scrape.Manager) {
for {
for key, targets := range scrapeManager.TargetsAll() {
level.Info(logger).Log("msg", "ScraperDebug", "key", key)
for _, target := range targets {
level.Info(logger).Log("msg", "ScraperDebug-Tar", "target", target.String(), "lastScrape", target.LastScrape())
}
}
time.Sleep(5 * time.Second)
}
}

// @TODO: REMOVE BEFORE RELEASE
func debugChannelWrapper(logger log.Logger, in <-chan map[string][]*targetgroup.Group, out chan<- map[string][]*targetgroup.Group) {
for {
select {
case val, ok := <-in:
if !ok {
// Input channel is closed, stop processing
return
}
// Print the received value from the input channel
jsonVal, err := json.MarshalIndent(val, "", " ")
if err != nil {
level.Info(logger).Log("Failed to marshal: %v", err)
continue
}
//Print the received value from the input channel in JSON format
fmt.Println("Received:", string(jsonVal))
//level.Info(logger).Log("Channels Received:", string(jsonVal))

// Send the value to the output channel
out <- val
}
}
}
func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan interface{}, wg *sync.WaitGroup, mth *metricsTypeHandler) {
logLevel := &promlog.AllowedLevel{}
logLevel.Set("info")
Expand All @@ -94,6 +137,7 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
cfg := struct {
configFile string
promlogConfig promlog.Config
taConfig tamanager.Config
okankoAMZ marked this conversation as resolved.
Show resolved Hide resolved
}{
promlogConfig: promlog.Config{Level: logLevel, Format: logFormat},
}
Expand All @@ -117,7 +161,10 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
sdMetrics, _ = discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)
taManager = createTargetAllocatorManager(configFilePath, log.With(logger, "component", "ta manager"), scrapeManager, discoveryManagerScrape)
)

level.Info(logger).Log("msg", fmt.Sprintf("Target Allocator is %t", taManager.enabled))
mth.SetScrapeManager(scrapeManager)

var reloaders = []func(cfg *config.Config) error{
Expand Down Expand Up @@ -151,7 +198,11 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
close(reloadReady.C)
})
}

// @TODO: REMOVE BEFORE RELEASE
//scrapeIn := make(chan map[string][]*targetgroup.Group)
//go debugChannelWrapper(logger, discoveryManagerScrape.SyncCh(), scrapeIn)
//go debugScrapeManager(logger, scrapeManager)
//----
var g run.Group
{
// Termination handler.
Expand Down Expand Up @@ -179,13 +230,15 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
// Scrape discovery manager.
g.Add(
func() error {
level.Info(logger).Log("msg", "Scrape discovery manager starting")
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
level.Info(logger).Log("msg", "Scrape discovery manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
level.Info(logger).Log("msg", "Stopping scrape discovery manager...", "error", err)
cancelScrape()
taManager.dmLiveCh <- struct{}{}
},
)
}
Expand All @@ -201,17 +254,38 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan

level.Info(logger).Log("msg", "start discovery")
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
//err := scrapeManager.Run(scrapeIn)
level.Info(logger).Log("msg", "Scrape manager stopped", "error", err)
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
level.Info(logger).Log("msg", "Stopping scrape manager...", "error", err)
scrapeManager.Stop()
taManager.smLiveCh <- struct{}{}
},
)
}
{
// Target Allocator manager.
if taManager.enabled {
g.Add(
func() error {

// we wait until the config is fully loaded.
level.Info(logger).Log("msg", "start ta manager")
err := taManager.Start()
level.Info(logger).Log("msg", "ta manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping ta manager...", "error", err)
taManager.Shutdown()
},
)
}
}
{
// Reload handler.

Expand All @@ -227,7 +301,7 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
for {
select {
case <-hup:
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
}

Expand Down Expand Up @@ -258,8 +332,9 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
default:
}

<-taManager.taReadyCh
okankoAMZ marked this conversation as resolved.
Show resolved Hide resolved
level.Info(logger).Log("msg", "handling config file")
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
}
level.Info(logger).Log("msg", "finish handling config file")
Expand Down Expand Up @@ -288,7 +363,7 @@ const (
savedScrapeNameLabel = "cwagent_saved_scrape_name" // just arbitrary name that end user won't override in relabel config
)

func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
func reloadConfig(filename string, logger log.Logger, taManager *TargetAllocatorManager, rls ...func(*config.Config) error) (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
content, _ := os.ReadFile(filename)
text := string(content)
Expand All @@ -302,12 +377,18 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
configSuccess.Set(0)
}
}()

conf, err := config.LoadFile(filename, false, false, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
// Check for TA
var conf *config.Config
if taManager.enabled {
level.Info(logger).Log("msg", "Target Allocator is enabled")
conf = (*config.Config)(taManager.config.PrometheusConfig)
} else {
conf, err = config.LoadFile(filename, false, false, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
}
}

level.Debug(logger).Log("config", conf)
// For saving name before relabel
// - __name__ https://github.com/aws/amazon-cloudwatch-agent/issues/190
// - job and instance https://github.com/aws/amazon-cloudwatch-agent/issues/193
Expand Down
Loading