Skip to content

Commit

Permalink
Throttling interval (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbulashev authored Jan 24, 2025
1 parent 77d27ff commit 2e68377
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 8 deletions.
4 changes: 4 additions & 0 deletions deploy/pgscv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#collect_top_table: 10
#collect_top_index: 10
#skip_conn_error_mode: false
#url_prefix: "example.com"
#conn_timeout: 3
#throttling_interval: 5
#discovery:
# yandex_mdb:
# type: yandex-mdb
Expand All @@ -25,6 +28,7 @@
# exclude_name:
# exclude_db:


services:
"postgres:5432":
service_type: "postgres"
Expand Down
30 changes: 25 additions & 5 deletions internal/pgscv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
)

const (
defaultListenAddress = "127.0.0.1:9890"
defaultPostgresUsername = "pgscv"
defaultPostgresDbname = "postgres"
defaultPgbouncerUsername = "pgscv"
defaultPgbouncerDbname = "pgbouncer"
defaultListenAddress = "127.0.0.1:9890"
defaultPostgresUsername = "pgscv"
defaultPostgresDbname = "postgres"
defaultPgbouncerUsername = "pgscv"
defaultPgbouncerDbname = "pgbouncer"
defaultThrottlingInterval int = 0 // seconds
)

// Config defines application's configuration.
Expand All @@ -45,6 +46,7 @@ type Config struct {
DiscoveryServices *map[string]sd.Discovery
ConnTimeout int `yaml:"conn_timeout"`
URLPrefix string `yaml:"url_prefix"` // Url prefix
ThrottlingInterval *int `yaml:"throttling_interval"`
}

// NewConfig creates new config based on config file or return default config if config file is not specified.
Expand Down Expand Up @@ -123,6 +125,9 @@ func NewConfig(configFilePath string) (*Config, error) {
if configFromEnv.URLPrefix != "" {
configFromFile.URLPrefix = configFromEnv.URLPrefix
}
if configFromEnv.ThrottlingInterval != nil {
configFromFile.ThrottlingInterval = configFromEnv.ThrottlingInterval
}
return configFromFile, nil
}

Expand Down Expand Up @@ -281,6 +286,15 @@ func (c *Config) Validate() error {
log.Infof("ConnTimeout: %d seconds timeout set for connecting to DB", c.ConnTimeout)
}

if c.ThrottlingInterval == nil {
throttlingInterval := defaultThrottlingInterval
c.ThrottlingInterval = &throttlingInterval
}

if *c.ThrottlingInterval > 0 {
log.Infof("ThrottlingInterval: %d seconds throttling interval set for scrape metrics", *c.ThrottlingInterval)
}

return nil
}

Expand Down Expand Up @@ -448,6 +462,12 @@ func newConfigFromEnv() (*Config, error) {
config.ConnTimeout = timeout
case "PGSCV_URL_PREFIX":
config.URLPrefix = value
case "PGSCV_THROTTLING_INTERVAL":
throttlingInterval, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("invalid setting PGSCV_THROTTLING_INTERVAL, value '%s': %s", value, err)
}
config.ThrottlingInterval = &throttlingInterval
}
}
return config, nil
Expand Down
28 changes: 26 additions & 2 deletions internal/pgscv/pgscv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
net_http "net/http"
"strings"
"sync"
"time"
)

const pgSCVSubscriber = "pgscv_subscriber"
Expand All @@ -38,6 +39,7 @@ func Start(ctx context.Context, config *Config) error {
CollectTopQuery: config.CollectTopQuery,
SkipConnErrorMode: config.SkipConnErrorMode,
ConnTimeout: config.ConnTimeout,
ThrottlingInterval: config.ThrottlingInterval,
}

if len(config.ServicesConnsSettings) == 0 && config.DiscoveryServices == nil {
Expand Down Expand Up @@ -153,9 +155,31 @@ func subscribeYandex(ds *sd.Discovery, config *Config, serviceRepo *service.Repo
}

// getMetricsHandler return http handler function to /metrics endpoint
func getMetricsHandler(repository *service.Repository) func(w net_http.ResponseWriter, r *net_http.Request) {
func getMetricsHandler(repository *service.Repository, throttlingInterval *int) func(w net_http.ResponseWriter, r *net_http.Request) {
throttle := struct {
sync.RWMutex
lastScrapeTime map[string]time.Time
}{
lastScrapeTime: make(map[string]time.Time),
}

return func(w net_http.ResponseWriter, r *net_http.Request) {
target := r.URL.Query().Get("target")
if throttlingInterval != nil && *throttlingInterval > 0 {
throttle.RLock()
t, ok := throttle.lastScrapeTime[target]
throttle.RUnlock()
if ok {
if time.Now().Sub(t) < time.Duration(*throttlingInterval)*time.Second {
w.WriteHeader(http.StatusOK)
log.Warn("Skip scraping")
return
}
}
throttle.Lock()
throttle.lastScrapeTime[target] = time.Now()
throttle.Unlock()
}
if target == "" {
h := promhttp.InstrumentMetricHandler(
prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}),
Expand Down Expand Up @@ -218,7 +242,7 @@ func runMetricsListener(ctx context.Context, config *Config, repository *service
Addr: config.ListenAddress,
AuthConfig: config.AuthConfig,
}
srv := http.NewServer(sCfg, getMetricsHandler(repository), getTargetsHandler(repository, config.URLPrefix, config.AuthConfig.EnableTLS))
srv := http.NewServer(sCfg, getMetricsHandler(repository, config.ThrottlingInterval), getTargetsHandler(repository, config.URLPrefix, config.AuthConfig.EnableTLS))

errCh := make(chan error)
defer close(errCh)
Expand Down
3 changes: 2 additions & 1 deletion internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type Config struct {
CollectTopQuery int
SkipConnErrorMode bool
ConstLabels *map[string]*map[string]string
ConnTimeout int // in seconds
ConnTimeout int // in seconds
ThrottlingInterval *int // in seconds, default 25
}

// Collector is an interface for prometheus.Collector.
Expand Down

0 comments on commit 2e68377

Please sign in to comment.