From 8883c56d9eda7cda8e19296749083ef26dc991b4 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Fri, 23 Sep 2022 15:31:01 -0400 Subject: [PATCH 01/15] Use the new scrape config endpoint --- .../prometheusreceiver/metrics_receiver.go | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 098abdb77019..56a284ede14e 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -16,8 +16,9 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "context" - "encoding/json" "fmt" + "gopkg.in/yaml.v2" + "io/ioutil" "net/http" "net/url" "regexp" @@ -115,13 +116,13 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { // baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) { r.settings.Logger.Debug("Syncing target allocator jobs") - jobObject, err := getJobResponse(allocConf.Endpoint) + scrapeConfigsResponse, err := getScrapeConfigsResponse(allocConf.Endpoint) if err != nil { r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err)) return 0, err } - hash, err := hashstructure.Hash(jobObject, hashstructure.FormatV2, nil) + hash, err := hashstructure.Hash(scrapeConfigsResponse, hashstructure.FormatV2, nil) if err != nil { r.settings.Logger.Error("Failed to hash job list", zap.Error(err)) return 0, err @@ -133,24 +134,20 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll cfg := *baseCfg - for _, linkJSON := range *jobObject { + for jobName, scrapeConfig := range scrapeConfigsResponse { var httpSD promHTTP.SDConfig if allocConf.HTTPSDConfig == nil { httpSD = promHTTP.SDConfig{} } else { httpSD = *allocConf.HTTPSDConfig } - - httpSD.URL = fmt.Sprintf("%s%s?collector_id=%s", allocConf.Endpoint, linkJSON.Link, allocConf.CollectorID) - - scrapeCfg := &config.ScrapeConfig{ - JobName: linkJSON.Link, - ServiceDiscoveryConfigs: discovery.Configs{ - &httpSD, - }, + escapedJob := url.QueryEscape(jobName) + httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID) + scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{ + &httpSD, } - cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeCfg) + cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeConfig) } err = r.applyCfg(&cfg) @@ -162,26 +159,30 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll return hash, nil } -func getJobResponse(baseURL string) (*map[string]linkJSON, error) { - jobURLString := fmt.Sprintf("%s/jobs", baseURL) - _, err := url.Parse(jobURLString) // check if valid +func getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) { + scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL) + _, err := url.Parse(scrapeConfigsURL) // check if valid if err != nil { return nil, err } - resp, err := http.Get(jobURLString) //nolint + resp, err := http.Get(scrapeConfigsURL) //nolint if err != nil { return nil, err } defer resp.Body.Close() - jobObject := &map[string]linkJSON{} - err = json.NewDecoder(resp.Body).Decode(jobObject) + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + jobToScrapeConfig := map[string]*config.ScrapeConfig{} + err = yaml.Unmarshal(body, &jobToScrapeConfig) if err != nil { return nil, err } - return jobObject, nil + return jobToScrapeConfig, nil } func (r *pReceiver) applyCfg(cfg *config.Config) error { @@ -192,6 +193,7 @@ func (r *pReceiver) applyCfg(cfg *config.Config) error { discoveryCfg := make(map[string]discovery.Configs) for _, scrapeConfig := range cfg.ScrapeConfigs { discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs + r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) } if err := r.discoveryManager.ApplyConfig(discoveryCfg); err != nil { return err From ec9364544334fa86463c2837e5133f6fb161307c Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 27 Sep 2022 13:07:03 -0400 Subject: [PATCH 02/15] Receiver fix --- .tool-versions | 1 + .../prometheusreceiver/metrics_receiver.go | 45 ++++++++++++++++--- 2 files changed, 41 insertions(+), 5 deletions(-) create mode 100644 .tool-versions diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 000000000000..00c8508d038e --- /dev/null +++ b/.tool-versions @@ -0,0 +1 @@ +golang 1.18.3 diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 56a284ede14e..78b0c75a1e92 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -17,6 +17,7 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "context" "fmt" + "github.com/prometheus/prometheus/discovery/targetgroup" "gopkg.in/yaml.v2" "io/ioutil" "net/http" @@ -105,6 +106,16 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { continue } savedHash = hash + //for jobName, targets := range r.scrapeManager.TargetsAll() { + // for _, target := range targets { + // r.settings.Logger.Info("target info", zap.String("job", jobName), + // zap.String("url", target.String()), + // zap.String("labels", target.Labels().String())) + // } + //} + //for _, provider := range r.discoveryManager.Providers() { + // r.settings.Logger.Info("discovery provider info", zap.Any("conf", provider.Config())) + //} } }() } @@ -116,7 +127,7 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { // baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) { r.settings.Logger.Debug("Syncing target allocator jobs") - scrapeConfigsResponse, err := getScrapeConfigsResponse(allocConf.Endpoint) + scrapeConfigsResponse, err := r.getScrapeConfigsResponse(allocConf.Endpoint) if err != nil { r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err)) return 0, err @@ -143,6 +154,7 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll } escapedJob := url.QueryEscape(jobName) httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID) + httpSD.HTTPClientConfig.FollowRedirects = false scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{ &httpSD, } @@ -159,7 +171,7 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll return hash, nil } -func getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) { +func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) { scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL) _, err := url.Parse(scrapeConfigsURL) // check if valid if err != nil { @@ -172,16 +184,21 @@ func getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } + r.settings.Logger.Info("body", zap.String("body", string(body))) jobToScrapeConfig := map[string]*config.ScrapeConfig{} err = yaml.Unmarshal(body, &jobToScrapeConfig) if err != nil { return nil, err } + for jobName, scrapeConfig := range jobToScrapeConfig { + for _, relabelConfig := range scrapeConfig.RelabelConfigs { + r.settings.Logger.Info("relabel config", zap.String("job", jobName), zap.Any("relabel", *relabelConfig), zap.String("regex", relabelConfig.Regex.String())) + } + } return jobToScrapeConfig, nil } @@ -193,7 +210,10 @@ func (r *pReceiver) applyCfg(cfg *config.Config) error { discoveryCfg := make(map[string]discovery.Configs) for _, scrapeConfig := range cfg.ScrapeConfigs { discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs - r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) + //r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) + r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName), + zap.Any("conf", *scrapeConfig), + ) } if err := r.discoveryManager.ApplyConfig(discoveryCfg); err != nil { return err @@ -230,8 +250,23 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels, ) r.scrapeManager = scrape.NewManager(&scrape.Options{PassMetadataInContext: true}, logger, store) + + c := make(chan map[string][]*targetgroup.Group) + go func() { + for { + select { + case m := <-r.discoveryManager.SyncCh(): + for job, groups := range m { + for _, group := range groups { + r.settings.Logger.Info("target group info", zap.String("job", job), zap.Any("l", group.Labels), zap.Any("t", group.Targets)) + } + } + c <- m + } + } + }() go func() { - if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil { + if err := r.scrapeManager.Run(c); err != nil { r.settings.Logger.Error("Scrape manager failed", zap.Error(err)) host.ReportFatalError(err) } From 9755c105459ca01825d7d11e0db61561d79146c2 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 28 Sep 2022 17:51:56 -0400 Subject: [PATCH 03/15] No more wacky hacky stuff --- receiver/prometheusreceiver/config.go | 4 +- .../prometheusreceiver/metrics_receiver.go | 137 ++++++++++-------- 2 files changed, 77 insertions(+), 64 deletions(-) diff --git a/receiver/prometheusreceiver/config.go b/receiver/prometheusreceiver/config.go index d479af48f1f0..c99a8369c65d 100644 --- a/receiver/prometheusreceiver/config.go +++ b/receiver/prometheusreceiver/config.go @@ -165,8 +165,8 @@ func (cfg *Config) Validate() error { } func (cfg *Config) validatePromConfig(promConfig *promconfig.Config) error { - if len(promConfig.ScrapeConfigs) == 0 { - return errors.New("no Prometheus scrape_configs") + if len(promConfig.ScrapeConfigs) == 0 && cfg.TargetAllocator == nil { + return errors.New("no Prometheus scrape_configs or target_allocator set") } // Reject features that Prometheus supports but that the receiver doesn't support: diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 78b0c75a1e92..ab440c24872f 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -15,14 +15,18 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" import ( + "bytes" "context" "fmt" - "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" "gopkg.in/yaml.v2" "io/ioutil" "net/http" "net/url" + "os" "regexp" + "sync" "time" "github.com/go-kit/log" @@ -43,11 +47,18 @@ const ( gcIntervalDelta = 1 * time.Minute ) +type closeOnce struct { + C chan struct{} + once sync.Once + Close func() +} + // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { - cfg *Config - consumer consumer.Metrics - cancelFunc context.CancelFunc + cfg *Config + consumer consumer.Metrics + cancelFunc context.CancelFunc + reloadReady *closeOnce settings component.ReceiverCreateSettings scrapeManager *scrape.Manager @@ -61,10 +72,19 @@ type linkJSON struct { // New creates a new prometheus.Receiver reference. func newPrometheusReceiver(set component.ReceiverCreateSettings, cfg *Config, next consumer.Metrics) *pReceiver { + reloadReady := &closeOnce{ + C: make(chan struct{}), + } + reloadReady.Close = func() { + reloadReady.once.Do(func() { + close(reloadReady.C) + }) + } pr := &pReceiver{ - cfg: cfg, - consumer: next, - settings: set, + cfg: cfg, + consumer: next, + settings: set, + reloadReady: reloadReady, } return pr } @@ -94,35 +114,33 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { allocConf := r.cfg.TargetAllocator if allocConf != nil { - go func() { - // immediately sync jobs and not wait for the first tick - savedHash, _ := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) - r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval) - for { - <-r.targetAllocatorIntervalTicker.C - hash, err := r.syncTargetAllocator(savedHash, allocConf, baseCfg) - if err != nil { - r.settings.Logger.Error(err.Error()) - continue - } - savedHash = hash - //for jobName, targets := range r.scrapeManager.TargetsAll() { - // for _, target := range targets { - // r.settings.Logger.Info("target info", zap.String("job", jobName), - // zap.String("url", target.String()), - // zap.String("labels", target.Labels().String())) - // } - //} - //for _, provider := range r.discoveryManager.Providers() { - // r.settings.Logger.Info("discovery provider info", zap.Any("conf", provider.Config())) - //} - } - }() + r.initTargetAllocator(allocConf, baseCfg) } + r.reloadReady.Close() + return nil } +func (r *pReceiver) initTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) { + r.settings.Logger.Info("Starting target allocator discovery") + // immediately sync jobs and not wait for the first tick + savedHash, _ := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) + r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval) + go func() { + <-r.reloadReady.C + for { + <-r.targetAllocatorIntervalTicker.C + hash, err := r.syncTargetAllocator(savedHash, allocConf, baseCfg) + if err != nil { + r.settings.Logger.Error(err.Error()) + continue + } + savedHash = hash + } + }() +} + // syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash. // baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) { @@ -143,12 +161,20 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll return hash, nil } - cfg := *baseCfg - for jobName, scrapeConfig := range scrapeConfigsResponse { + var filteredList []*relabel.Config + for _, relabelConfig := range scrapeConfig.RelabelConfigs { + if relabelConfig.SourceLabels.Len() > 0 && relabelConfig.SourceLabels[0] == "__tmp_hash" { + continue + } + filteredList = append(filteredList, relabelConfig) + } + scrapeConfig.RelabelConfigs = filteredList var httpSD promHTTP.SDConfig if allocConf.HTTPSDConfig == nil { - httpSD = promHTTP.SDConfig{} + httpSD = promHTTP.SDConfig{ + RefreshInterval: model.Duration(30 * time.Second), + } } else { httpSD = *allocConf.HTTPSDConfig } @@ -159,10 +185,10 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll &httpSD, } - cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeConfig) + baseCfg.ScrapeConfigs = append(baseCfg.ScrapeConfigs, scrapeConfig) } - err = r.applyCfg(&cfg) + err = r.applyCfg(baseCfg) if err != nil { r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) return 0, err @@ -171,6 +197,10 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll return hash, nil } +func (r *pReceiver) instantiateShard(body []byte) []byte { + return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(os.Getenv("SHARD"))) +} + func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) { scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL) _, err := url.Parse(scrapeConfigsURL) // check if valid @@ -188,17 +218,13 @@ func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config if err != nil { return nil, err } - r.settings.Logger.Info("body", zap.String("body", string(body))) + jobToScrapeConfig := map[string]*config.ScrapeConfig{} - err = yaml.Unmarshal(body, &jobToScrapeConfig) + envReplacedBody := r.instantiateShard(body) + err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig) if err != nil { return nil, err } - for jobName, scrapeConfig := range jobToScrapeConfig { - for _, relabelConfig := range scrapeConfig.RelabelConfigs { - r.settings.Logger.Info("relabel config", zap.String("job", jobName), zap.Any("relabel", *relabelConfig), zap.String("regex", relabelConfig.Regex.String())) - } - } return jobToScrapeConfig, nil } @@ -210,10 +236,7 @@ func (r *pReceiver) applyCfg(cfg *config.Config) error { discoveryCfg := make(map[string]discovery.Configs) for _, scrapeConfig := range cfg.ScrapeConfigs { discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs - //r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) - r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName), - zap.Any("conf", *scrapeConfig), - ) + r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) } if err := r.discoveryManager.ApplyConfig(discoveryCfg); err != nil { return err @@ -225,6 +248,7 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component r.discoveryManager = discovery.NewManager(ctx, logger) go func() { + r.settings.Logger.Info("Starting discovery manager") if err := r.discoveryManager.Run(); err != nil { r.settings.Logger.Error("Discovery manager failed", zap.Error(err)) host.ReportFatalError(err) @@ -251,22 +275,10 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component ) r.scrapeManager = scrape.NewManager(&scrape.Options{PassMetadataInContext: true}, logger, store) - c := make(chan map[string][]*targetgroup.Group) - go func() { - for { - select { - case m := <-r.discoveryManager.SyncCh(): - for job, groups := range m { - for _, group := range groups { - r.settings.Logger.Info("target group info", zap.String("job", job), zap.Any("l", group.Labels), zap.Any("t", group.Targets)) - } - } - c <- m - } - } - }() go func() { - if err := r.scrapeManager.Run(c); err != nil { + <-r.reloadReady.C + r.settings.Logger.Info("Starting scrape manager") + if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil { r.settings.Logger.Error("Scrape manager failed", zap.Error(err)) host.ReportFatalError(err) } @@ -294,6 +306,7 @@ func gcInterval(cfg *config.Config) time.Duration { func (r *pReceiver) Shutdown(context.Context) error { r.cancelFunc() r.scrapeManager.Stop() + r.reloadReady.Close() if r.targetAllocatorIntervalTicker != nil { r.targetAllocatorIntervalTicker.Stop() } From f346f5916d2a7c9e638692a0dbff2b2de5be2afd Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 28 Sep 2022 18:46:55 -0400 Subject: [PATCH 04/15] Fix tests --- .tool-versions | 1 - .../prometheusreceiver/metrics_receiver.go | 32 ++--- .../metrics_receiver_target_allocator_test.go | 111 ++++++++++++++---- 3 files changed, 104 insertions(+), 40 deletions(-) delete mode 100644 .tool-versions diff --git a/.tool-versions b/.tool-versions deleted file mode 100644 index 00c8508d038e..000000000000 --- a/.tool-versions +++ /dev/null @@ -1 +0,0 @@ -golang 1.18.3 diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index ab440c24872f..d1218198acb8 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -18,10 +18,8 @@ import ( "bytes" "context" "fmt" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/relabel" "gopkg.in/yaml.v2" - "io/ioutil" + "io" "net/http" "net/url" "os" @@ -31,6 +29,7 @@ import ( "github.com/go-kit/log" "github.com/mitchellh/hashstructure/v2" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" promHTTP "github.com/prometheus/prometheus/discovery/http" @@ -47,6 +46,7 @@ const ( gcIntervalDelta = 1 * time.Minute ) +// closeOnce ensures that the scrape manager is started after the discovery manager type closeOnce struct { C chan struct{} once sync.Once @@ -125,7 +125,11 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { func (r *pReceiver) initTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) { r.settings.Logger.Info("Starting target allocator discovery") // immediately sync jobs and not wait for the first tick - savedHash, _ := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) + savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) + if err != nil { + r.settings.Logger.Error(err.Error()) + return + } r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval) go func() { <-r.reloadReady.C @@ -161,15 +165,10 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll return hash, nil } + // Clear out the current configurations + baseCfg.ScrapeConfigs = []*config.ScrapeConfig{} + for jobName, scrapeConfig := range scrapeConfigsResponse { - var filteredList []*relabel.Config - for _, relabelConfig := range scrapeConfig.RelabelConfigs { - if relabelConfig.SourceLabels.Len() > 0 && relabelConfig.SourceLabels[0] == "__tmp_hash" { - continue - } - filteredList = append(filteredList, relabelConfig) - } - scrapeConfig.RelabelConfigs = filteredList var httpSD promHTTP.SDConfig if allocConf.HTTPSDConfig == nil { httpSD = promHTTP.SDConfig{ @@ -197,8 +196,13 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll return hash, nil } +// instantiateShard inserts the SHARD environment variable in the returned configuration func (r *pReceiver) instantiateShard(body []byte) []byte { - return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(os.Getenv("SHARD"))) + shard, ok := os.LookupEnv("SHARD") + if !ok { + shard = "0" + } + return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard)) } func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) { @@ -214,7 +218,7 @@ func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } diff --git a/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go b/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go index ef1387889c48..790e52f381d4 100644 --- a/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go @@ -29,7 +29,6 @@ import ( commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" promConfig "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery" promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -173,10 +172,26 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { desc: "default", responses: Responses{ responses: map[string][]mockTargetAllocatorResponseRaw{ - "/jobs": { - mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ - "job1": {Link: "/jobs/job1/targets"}, - "job2": {Link: "/jobs/job2/targets"}, + "/scrape_configs": { + mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]interface{}{ + "job1": { + "job_name": "job1", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, + "job2": { + "job_name": "job2", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, }}, }, "/jobs/job1/targets": { @@ -252,10 +267,26 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { desc: "update labels and targets", responses: Responses{ responses: map[string][]mockTargetAllocatorResponseRaw{ - "/jobs": { - mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ - "job1": {Link: "/jobs/job1/targets"}, - "job2": {Link: "/jobs/job2/targets"}, + "/scrape_configs": { + mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]interface{}{ + "job1": { + "job_name": "job1", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, + "job2": { + "job_name": "job2", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, }}, }, "/jobs/job1/targets": { @@ -326,17 +357,49 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { desc: "update job list", responses: Responses{ releaserMap: map[string]int{ - "/jobs": 1, + "/scrape_configs": 1, }, responses: map[string][]mockTargetAllocatorResponseRaw{ - "/jobs": { - mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ - "job1": {Link: "/jobs/job1/targets"}, - "job2": {Link: "/jobs/job2/targets"}, + "/scrape_configs": { + mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]interface{}{ + "job1": { + "job_name": "job1", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, + "job2": { + "job_name": "job2", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, }}, - mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ - "job1": {Link: "/jobs/job1/targets"}, - "job3": {Link: "/jobs/job3/targets"}, + mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]interface{}{ + "job1": { + "job_name": "job1", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, + "job3": { + "job_name": "job3", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, }}, }, "/jobs/job1/targets": { @@ -407,12 +470,12 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { desc: "endpoint is not reachable", responses: Responses{ releaserMap: map[string]int{ - "/jobs": 1, // we are too fast if we ignore the first wait a tick + "/scrape_configs": 1, // we are too fast if we ignore the first wait a tick }, responses: map[string][]mockTargetAllocatorResponseRaw{ - "/jobs": { - mockTargetAllocatorResponseRaw{code: 404, data: map[string]linkJSON{}}, - mockTargetAllocatorResponseRaw{code: 404, data: map[string]linkJSON{}}, + "/scrape_configs": { + mockTargetAllocatorResponseRaw{code: 404, data: map[string]map[string]interface{}{}}, + mockTargetAllocatorResponseRaw{code: 404, data: map[string]map[string]interface{}{}}, }, }, }, @@ -453,10 +516,8 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { providers := receiver.discoveryManager.Providers() if tc.want.empty { - // if no base config is supplied and the job retrieval fails and therefor no configuration is available - // PrometheusSD adds a static provider as default - require.Len(t, providers, 1) - require.IsType(t, discovery.StaticConfig{}, providers[0].Config()) + // if no base config is supplied and the job retrieval fails then no configuration should be found + require.Len(t, providers, 0) return } From a663cf6cff05a68cdb0d749f4742f729f57f1530 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 28 Sep 2022 18:54:04 -0400 Subject: [PATCH 05/15] release notes --- ...receiver_target_allocator_scrape_config.yaml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 unreleased/prometheus_receiver_target_allocator_scrape_config.yaml diff --git a/unreleased/prometheus_receiver_target_allocator_scrape_config.yaml b/unreleased/prometheus_receiver_target_allocator_scrape_config.yaml new file mode 100644 index 000000000000..b75229a9cf89 --- /dev/null +++ b/unreleased/prometheus_receiver_target_allocator_scrape_config.yaml @@ -0,0 +1,17 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: changes to use the new scrape_configs endpoint in the target allocator to dynamically pull scrape configuration. + +# One or more tracking issues related to the change +issues: + - https://github.com/open-telemetry/opentelemetry-operator/issues/1106 + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: From 67857378912eab49a109005e5c80dd46ff35b756 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 28 Sep 2022 18:55:06 -0400 Subject: [PATCH 06/15] Import order --- receiver/prometheusreceiver/metrics_receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index d1218198acb8..ea5cfb54618b 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "fmt" - "gopkg.in/yaml.v2" "io" "net/http" "net/url" @@ -37,6 +36,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.uber.org/zap" + "gopkg.in/yaml.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal" ) From 16501d99dde6237ad912ef50b24e7d8a0bb3149f Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 29 Sep 2022 10:44:36 -0400 Subject: [PATCH 07/15] linting --- receiver/prometheusreceiver/metrics_receiver.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index ea5cfb54618b..e4a2e94f746d 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -66,10 +66,6 @@ type pReceiver struct { targetAllocatorIntervalTicker *time.Ticker } -type linkJSON struct { - Link string `json:"_link"` -} - // New creates a new prometheus.Receiver reference. func newPrometheusReceiver(set component.ReceiverCreateSettings, cfg *Config, next consumer.Metrics) *pReceiver { reloadReady := &closeOnce{ @@ -114,7 +110,7 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { allocConf := r.cfg.TargetAllocator if allocConf != nil { - r.initTargetAllocator(allocConf, baseCfg) + err = r.initTargetAllocator(allocConf, baseCfg) } r.reloadReady.Close() @@ -122,13 +118,12 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { return nil } -func (r *pReceiver) initTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) { +func (r *pReceiver) initTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) error { r.settings.Logger.Info("Starting target allocator discovery") // immediately sync jobs and not wait for the first tick savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) if err != nil { - r.settings.Logger.Error(err.Error()) - return + return err } r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval) go func() { @@ -143,6 +138,7 @@ func (r *pReceiver) initTargetAllocator(allocConf *targetAllocator, baseCfg *con savedHash = hash } }() + return err } // syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash. From 92650815ef6bc8f69db4c7ead1d33c2ee4fff1fd Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 29 Sep 2022 10:46:13 -0400 Subject: [PATCH 08/15] return on err --- receiver/prometheusreceiver/metrics_receiver.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index e4a2e94f746d..17d31b5f9363 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -111,6 +111,9 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { allocConf := r.cfg.TargetAllocator if allocConf != nil { err = r.initTargetAllocator(allocConf, baseCfg) + if err != nil { + return err + } } r.reloadReady.Close() From 7752198acee392e4b70b78a906e1f6dfc2e1ef05 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 29 Sep 2022 11:17:15 -0400 Subject: [PATCH 09/15] fix more linting --- receiver/prometheusreceiver/metrics_receiver.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 17d31b5f9363..04cca1154e7d 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -85,7 +85,7 @@ func newPrometheusReceiver(set component.ReceiverCreateSettings, cfg *Config, ne return pr } -// Start is the method that starts Prometheus scraping and it +// Start is the method that starts Prometheus scraping. It // is controlled by having previously defined a Configuration using perhaps New. func (r *pReceiver) Start(_ context.Context, host component.Host) error { discoveryCtx, cancel := context.WithCancel(context.Background()) @@ -133,9 +133,9 @@ func (r *pReceiver) initTargetAllocator(allocConf *targetAllocator, baseCfg *con <-r.reloadReady.C for { <-r.targetAllocatorIntervalTicker.C - hash, err := r.syncTargetAllocator(savedHash, allocConf, baseCfg) - if err != nil { - r.settings.Logger.Error(err.Error()) + hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg) + if newErr != nil { + r.settings.Logger.Error(newErr.Error()) continue } savedHash = hash @@ -216,7 +216,6 @@ func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config return nil, err } - defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, err @@ -228,6 +227,10 @@ func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config if err != nil { return nil, err } + err = resp.Body.Close() + if err != nil { + return nil, err + } return jobToScrapeConfig, nil } From a46743edad8c4c67e2a19cb67722966d252c3ade Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 29 Sep 2022 11:21:13 -0400 Subject: [PATCH 10/15] use new issue --- .../prometheus_receiver_target_allocator_scrape_config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unreleased/prometheus_receiver_target_allocator_scrape_config.yaml b/unreleased/prometheus_receiver_target_allocator_scrape_config.yaml index b75229a9cf89..9c91670b2e43 100644 --- a/unreleased/prometheus_receiver_target_allocator_scrape_config.yaml +++ b/unreleased/prometheus_receiver_target_allocator_scrape_config.yaml @@ -9,7 +9,7 @@ note: changes to use the new scrape_configs endpoint in the target allocator to # One or more tracking issues related to the change issues: - - https://github.com/open-telemetry/opentelemetry-operator/issues/1106 + - 14597 # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. From ea9cd7fdee596b3bc25c0d281fdf95e1f0497ef9 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 4 Oct 2022 10:57:24 -0400 Subject: [PATCH 11/15] Renamed channel, moved initializer --- receiver/prometheusreceiver/metrics_receiver.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 04cca1154e7d..6773abfd29a2 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -48,9 +48,9 @@ const ( // closeOnce ensures that the scrape manager is started after the discovery manager type closeOnce struct { - C chan struct{} - once sync.Once - Close func() + Loaded chan struct{} + once sync.Once + Close func() } // pReceiver is the type that provides Prometheus scraper/receiver functionality. @@ -69,11 +69,11 @@ type pReceiver struct { // New creates a new prometheus.Receiver reference. func newPrometheusReceiver(set component.ReceiverCreateSettings, cfg *Config, next consumer.Metrics) *pReceiver { reloadReady := &closeOnce{ - C: make(chan struct{}), + Loaded: make(chan struct{}), } reloadReady.Close = func() { reloadReady.once.Do(func() { - close(reloadReady.C) + close(reloadReady.Loaded) }) } pr := &pReceiver{ @@ -128,9 +128,9 @@ func (r *pReceiver) initTargetAllocator(allocConf *targetAllocator, baseCfg *con if err != nil { return err } - r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval) go func() { - <-r.reloadReady.C + <-r.reloadReady.Loaded + r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval) for { <-r.targetAllocatorIntervalTicker.C hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg) @@ -282,7 +282,7 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component r.scrapeManager = scrape.NewManager(&scrape.Options{PassMetadataInContext: true}, logger, store) go func() { - <-r.reloadReady.C + <-r.reloadReady.Loaded r.settings.Logger.Info("Starting scrape manager") if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil { r.settings.Logger.Error("Scrape manager failed", zap.Error(err)) From 2631ea968b0678fd453061acd9f07983a3489462 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 4 Oct 2022 12:14:48 -0400 Subject: [PATCH 12/15] update from feedback --- .../prometheusreceiver/metrics_receiver.go | 67 +++++++++---------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 6773abfd29a2..3d62d73d83b8 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -46,41 +46,29 @@ const ( gcIntervalDelta = 1 * time.Minute ) -// closeOnce ensures that the scrape manager is started after the discovery manager -type closeOnce struct { - Loaded chan struct{} - once sync.Once - Close func() -} - // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { - cfg *Config - consumer consumer.Metrics - cancelFunc context.CancelFunc - reloadReady *closeOnce + cfg *Config + consumer consumer.Metrics + cancelFunc context.CancelFunc + configLoaded chan struct{} + loadConfigOnce sync.Once settings component.ReceiverCreateSettings scrapeManager *scrape.Manager discoveryManager *discovery.Manager targetAllocatorIntervalTicker *time.Ticker + targetAllocatorStop chan bool } // New creates a new prometheus.Receiver reference. func newPrometheusReceiver(set component.ReceiverCreateSettings, cfg *Config, next consumer.Metrics) *pReceiver { - reloadReady := &closeOnce{ - Loaded: make(chan struct{}), - } - reloadReady.Close = func() { - reloadReady.once.Do(func() { - close(reloadReady.Loaded) - }) - } pr := &pReceiver{ - cfg: cfg, - consumer: next, - settings: set, - reloadReady: reloadReady, + cfg: cfg, + consumer: next, + settings: set, + configLoaded: make(chan struct{}), + targetAllocatorStop: make(chan bool, 1), } return pr } @@ -110,18 +98,20 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { allocConf := r.cfg.TargetAllocator if allocConf != nil { - err = r.initTargetAllocator(allocConf, baseCfg) + err = r.startTargetAllocator(allocConf, baseCfg) if err != nil { return err } } - r.reloadReady.Close() + r.loadConfigOnce.Do(func() { + close(r.configLoaded) + }) return nil } -func (r *pReceiver) initTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) error { +func (r *pReceiver) startTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) error { r.settings.Logger.Info("Starting target allocator discovery") // immediately sync jobs and not wait for the first tick savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) @@ -129,19 +119,23 @@ func (r *pReceiver) initTargetAllocator(allocConf *targetAllocator, baseCfg *con return err } go func() { - <-r.reloadReady.Loaded r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval) for { - <-r.targetAllocatorIntervalTicker.C - hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg) - if newErr != nil { - r.settings.Logger.Error(newErr.Error()) - continue + select { + case <-r.targetAllocatorIntervalTicker.C: + hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg) + if newErr != nil { + r.settings.Logger.Error(newErr.Error()) + continue + } + savedHash = hash + case <-r.targetAllocatorStop: + r.settings.Logger.Info("Stopping target allocator") + return } - savedHash = hash } }() - return err + return nil } // syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash. @@ -282,7 +276,8 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component r.scrapeManager = scrape.NewManager(&scrape.Options{PassMetadataInContext: true}, logger, store) go func() { - <-r.reloadReady.Loaded + // The scrape manager needs to wait for the configuration to be loaded before beginning + <-r.configLoaded r.settings.Logger.Info("Starting scrape manager") if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil { r.settings.Logger.Error("Scrape manager failed", zap.Error(err)) @@ -312,9 +307,9 @@ func gcInterval(cfg *config.Config) time.Duration { func (r *pReceiver) Shutdown(context.Context) error { r.cancelFunc() r.scrapeManager.Stop() - r.reloadReady.Close() if r.targetAllocatorIntervalTicker != nil { r.targetAllocatorIntervalTicker.Stop() + r.targetAllocatorStop <- true } return nil } From b8351712a364788fe49e07de948cb87a6317aa02 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 4 Oct 2022 14:07:56 -0400 Subject: [PATCH 13/15] Updated from feedback --- .../prometheusreceiver/metrics_receiver.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 3d62d73d83b8..fdc8141f236f 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -54,11 +54,10 @@ type pReceiver struct { configLoaded chan struct{} loadConfigOnce sync.Once - settings component.ReceiverCreateSettings - scrapeManager *scrape.Manager - discoveryManager *discovery.Manager - targetAllocatorIntervalTicker *time.Ticker - targetAllocatorStop chan bool + settings component.ReceiverCreateSettings + scrapeManager *scrape.Manager + discoveryManager *discovery.Manager + targetAllocatorStop chan bool } // New creates a new prometheus.Receiver reference. @@ -119,10 +118,10 @@ func (r *pReceiver) startTargetAllocator(allocConf *targetAllocator, baseCfg *co return err } go func() { - r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval) + targetAllocatorIntervalTicker := time.NewTicker(allocConf.Interval) for { select { - case <-r.targetAllocatorIntervalTicker.C: + case <-targetAllocatorIntervalTicker.C: hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg) if newErr != nil { r.settings.Logger.Error(newErr.Error()) @@ -130,6 +129,7 @@ func (r *pReceiver) startTargetAllocator(allocConf *targetAllocator, baseCfg *co } savedHash = hash case <-r.targetAllocatorStop: + targetAllocatorIntervalTicker.Stop() r.settings.Logger.Info("Stopping target allocator") return } @@ -307,9 +307,6 @@ func gcInterval(cfg *config.Config) time.Duration { func (r *pReceiver) Shutdown(context.Context) error { r.cancelFunc() r.scrapeManager.Stop() - if r.targetAllocatorIntervalTicker != nil { - r.targetAllocatorIntervalTicker.Stop() - r.targetAllocatorStop <- true - } + close(r.targetAllocatorStop) return nil } From ff01d23d15f35a12598d293006bc0b3632709bfa Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 4 Oct 2022 16:06:22 -0400 Subject: [PATCH 14/15] Re-order --- .../prometheusreceiver/metrics_receiver.go | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index fdc8141f236f..3f9f0aed0be9 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -48,16 +48,16 @@ const ( // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { - cfg *Config - consumer consumer.Metrics - cancelFunc context.CancelFunc - configLoaded chan struct{} - loadConfigOnce sync.Once - - settings component.ReceiverCreateSettings - scrapeManager *scrape.Manager - discoveryManager *discovery.Manager - targetAllocatorStop chan bool + cfg *Config + consumer consumer.Metrics + cancelFunc context.CancelFunc + targetAllocatorStop chan struct{} + configLoaded chan struct{} + loadConfigOnce sync.Once + + settings component.ReceiverCreateSettings + scrapeManager *scrape.Manager + discoveryManager *discovery.Manager } // New creates a new prometheus.Receiver reference. @@ -67,7 +67,7 @@ func newPrometheusReceiver(set component.ReceiverCreateSettings, cfg *Config, ne consumer: next, settings: set, configLoaded: make(chan struct{}), - targetAllocatorStop: make(chan bool, 1), + targetAllocatorStop: make(chan struct{}), } return pr } From 831ac59ee92890605f4304b88030e95b801fde71 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 4 Oct 2022 17:38:50 -0400 Subject: [PATCH 15/15] Comment change to retrigger CI --- receiver/prometheusreceiver/metrics_receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 3f9f0aed0be9..c3ce9aa557d6 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -112,7 +112,7 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { func (r *pReceiver) startTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) error { r.settings.Logger.Info("Starting target allocator discovery") - // immediately sync jobs and not wait for the first tick + // immediately sync jobs, not waiting for the first tick savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) if err != nil { return err