From 2e68377aefda085fa85ed2f458fdea5b1e78ae97 Mon Sep 17 00:00:00 2001 From: Dmitry Bulashev Date: Fri, 24 Jan 2025 19:49:25 +0500 Subject: [PATCH] Throttling interval (#82) --- deploy/pgscv.yaml | 4 ++++ internal/pgscv/config.go | 30 +++++++++++++++++++++++++----- internal/pgscv/pgscv.go | 28 ++++++++++++++++++++++++++-- internal/service/service.go | 3 ++- 4 files changed, 57 insertions(+), 8 deletions(-) diff --git a/deploy/pgscv.yaml b/deploy/pgscv.yaml index 72600a3..986aa5c 100644 --- a/deploy/pgscv.yaml +++ b/deploy/pgscv.yaml @@ -9,6 +9,9 @@ #collect_top_table: 10 #collect_top_index: 10 #skip_conn_error_mode: false +#url_prefix: "example.com" +#conn_timeout: 3 +#throttling_interval: 5 #discovery: # yandex_mdb: # type: yandex-mdb @@ -25,6 +28,7 @@ # exclude_name: # exclude_db: + services: "postgres:5432": service_type: "postgres" diff --git a/internal/pgscv/config.go b/internal/pgscv/config.go index c296006..0f092dd 100644 --- a/internal/pgscv/config.go +++ b/internal/pgscv/config.go @@ -19,11 +19,12 @@ import ( ) const ( - defaultListenAddress = "127.0.0.1:9890" - defaultPostgresUsername = "pgscv" - defaultPostgresDbname = "postgres" - defaultPgbouncerUsername = "pgscv" - defaultPgbouncerDbname = "pgbouncer" + defaultListenAddress = "127.0.0.1:9890" + defaultPostgresUsername = "pgscv" + defaultPostgresDbname = "postgres" + defaultPgbouncerUsername = "pgscv" + defaultPgbouncerDbname = "pgbouncer" + defaultThrottlingInterval int = 0 // seconds ) // Config defines application's configuration. @@ -45,6 +46,7 @@ type Config struct { DiscoveryServices *map[string]sd.Discovery ConnTimeout int `yaml:"conn_timeout"` URLPrefix string `yaml:"url_prefix"` // Url prefix + ThrottlingInterval *int `yaml:"throttling_interval"` } // NewConfig creates new config based on config file or return default config if config file is not specified. @@ -123,6 +125,9 @@ func NewConfig(configFilePath string) (*Config, error) { if configFromEnv.URLPrefix != "" { configFromFile.URLPrefix = configFromEnv.URLPrefix } + if configFromEnv.ThrottlingInterval != nil { + configFromFile.ThrottlingInterval = configFromEnv.ThrottlingInterval + } return configFromFile, nil } @@ -281,6 +286,15 @@ func (c *Config) Validate() error { log.Infof("ConnTimeout: %d seconds timeout set for connecting to DB", c.ConnTimeout) } + if c.ThrottlingInterval == nil { + throttlingInterval := defaultThrottlingInterval + c.ThrottlingInterval = &throttlingInterval + } + + if *c.ThrottlingInterval > 0 { + log.Infof("ThrottlingInterval: %d seconds throttling interval set for scrape metrics", *c.ThrottlingInterval) + } + return nil } @@ -448,6 +462,12 @@ func newConfigFromEnv() (*Config, error) { config.ConnTimeout = timeout case "PGSCV_URL_PREFIX": config.URLPrefix = value + case "PGSCV_THROTTLING_INTERVAL": + throttlingInterval, err := strconv.Atoi(value) + if err != nil { + return nil, fmt.Errorf("invalid setting PGSCV_THROTTLING_INTERVAL, value '%s': %s", value, err) + } + config.ThrottlingInterval = &throttlingInterval } } return config, nil diff --git a/internal/pgscv/pgscv.go b/internal/pgscv/pgscv.go index a899e58..9b058a6 100644 --- a/internal/pgscv/pgscv.go +++ b/internal/pgscv/pgscv.go @@ -16,6 +16,7 @@ import ( net_http "net/http" "strings" "sync" + "time" ) const pgSCVSubscriber = "pgscv_subscriber" @@ -38,6 +39,7 @@ func Start(ctx context.Context, config *Config) error { CollectTopQuery: config.CollectTopQuery, SkipConnErrorMode: config.SkipConnErrorMode, ConnTimeout: config.ConnTimeout, + ThrottlingInterval: config.ThrottlingInterval, } if len(config.ServicesConnsSettings) == 0 && config.DiscoveryServices == nil { @@ -153,9 +155,31 @@ func subscribeYandex(ds *sd.Discovery, config *Config, serviceRepo *service.Repo } // getMetricsHandler return http handler function to /metrics endpoint -func getMetricsHandler(repository *service.Repository) func(w net_http.ResponseWriter, r *net_http.Request) { +func getMetricsHandler(repository *service.Repository, throttlingInterval *int) func(w net_http.ResponseWriter, r *net_http.Request) { + throttle := struct { + sync.RWMutex + lastScrapeTime map[string]time.Time + }{ + lastScrapeTime: make(map[string]time.Time), + } + return func(w net_http.ResponseWriter, r *net_http.Request) { target := r.URL.Query().Get("target") + if throttlingInterval != nil && *throttlingInterval > 0 { + throttle.RLock() + t, ok := throttle.lastScrapeTime[target] + throttle.RUnlock() + if ok { + if time.Now().Sub(t) < time.Duration(*throttlingInterval)*time.Second { + w.WriteHeader(http.StatusOK) + log.Warn("Skip scraping") + return + } + } + throttle.Lock() + throttle.lastScrapeTime[target] = time.Now() + throttle.Unlock() + } if target == "" { h := promhttp.InstrumentMetricHandler( prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}), @@ -218,7 +242,7 @@ func runMetricsListener(ctx context.Context, config *Config, repository *service Addr: config.ListenAddress, AuthConfig: config.AuthConfig, } - srv := http.NewServer(sCfg, getMetricsHandler(repository), getTargetsHandler(repository, config.URLPrefix, config.AuthConfig.EnableTLS)) + srv := http.NewServer(sCfg, getMetricsHandler(repository, config.ThrottlingInterval), getTargetsHandler(repository, config.URLPrefix, config.AuthConfig.EnableTLS)) errCh := make(chan error) defer close(errCh) diff --git a/internal/service/service.go b/internal/service/service.go index 485320e..61e781c 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -51,7 +51,8 @@ type Config struct { CollectTopQuery int SkipConnErrorMode bool ConstLabels *map[string]*map[string]string - ConnTimeout int // in seconds + ConnTimeout int // in seconds + ThrottlingInterval *int // in seconds, default 25 } // Collector is an interface for prometheus.Collector.