Skip to content

Commit

Permalink
feat(agent): Introduce CLI option to set config URL retry attempts (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
powersj authored May 21, 2024
1 parent d8aa46e commit ad59290
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 55 deletions.
32 changes: 19 additions & 13 deletions cmd/telegraf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,20 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
filters := processFilterFlags(cCtx)

g := GlobalFlags{
config: cCtx.StringSlice("config"),
configDir: cCtx.StringSlice("config-directory"),
testWait: cCtx.Int("test-wait"),
watchConfig: cCtx.String("watch-config"),
pidFile: cCtx.String("pidfile"),
plugindDir: cCtx.String("plugin-directory"),
password: cCtx.String("password"),
oldEnvBehavior: cCtx.Bool("old-env-behavior"),
test: cCtx.Bool("test"),
debug: cCtx.Bool("debug"),
once: cCtx.Bool("once"),
quiet: cCtx.Bool("quiet"),
unprotected: cCtx.Bool("unprotected"),
config: cCtx.StringSlice("config"),
configDir: cCtx.StringSlice("config-directory"),
testWait: cCtx.Int("test-wait"),
configURLRetryAttempts: cCtx.Int("config-url-retry-attempts"),
watchConfig: cCtx.String("watch-config"),
pidFile: cCtx.String("pidfile"),
plugindDir: cCtx.String("plugin-directory"),
password: cCtx.String("password"),
oldEnvBehavior: cCtx.Bool("old-env-behavior"),
test: cCtx.Bool("test"),
debug: cCtx.Bool("debug"),
once: cCtx.Bool("once"),
quiet: cCtx.Bool("quiet"),
unprotected: cCtx.Bool("unprotected"),
}

w := WindowFlags{
Expand Down Expand Up @@ -275,6 +276,11 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
Name: "test-wait",
Usage: "wait up to this many seconds for service inputs to complete in test mode",
},
&cli.IntFlag{
Name: "config-url-retry-attempts",
Usage: "Number of attempts to obtain a remote configuration via a URL during startup. " +
"Set to -1 for unlimited attempts. (default: 3)",
},
//
// String flags
&cli.StringFlag{
Expand Down
28 changes: 15 additions & 13 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,20 @@ import (
var stop chan struct{}

type GlobalFlags struct {
config []string
configDir []string
testWait int
watchConfig string
pidFile string
plugindDir string
password string
oldEnvBehavior bool
test bool
debug bool
once bool
quiet bool
unprotected bool
config []string
configDir []string
testWait int
configURLRetryAttempts int
watchConfig string
pidFile string
plugindDir string
password string
oldEnvBehavior bool
test bool
debug bool
once bool
quiet bool
unprotected bool
}

type WindowFlags struct {
Expand Down Expand Up @@ -248,6 +249,7 @@ func (t *Telegraf) loadConfiguration() (*config.Config, error) {
configFiles = append(configFiles, defaultFiles...)
}

c.Agent.ConfigURLRetryAttempts = t.configURLRetryAttempts
t.configFiles = configFiles
if err := c.LoadAll(configFiles...); err != nil {
return c, err
Expand Down
79 changes: 51 additions & 28 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ type AgentConfig struct {
// By default, processors are run a second time after aggregators. Changing
// this setting to true will skip the second run of processors.
SkipProcessorsAfterAggregators bool `toml:"skip_processors_after_aggregators"`

// Number of attempts to obtain a remote configuration via a URL during
// startup. Set to -1 for unlimited attempts.
ConfigURLRetryAttempts int `toml:"config-url-retry-attempts"`
}

// InputNames returns a list of strings of the configured inputs.
Expand Down Expand Up @@ -449,7 +453,7 @@ func (c *Config) LoadConfig(path string) error {
log.Printf("I! Loading config: %s", path)
}

data, _, err := LoadConfigFile(path)
data, _, err := LoadConfigFileWithRetries(path, c.Agent.ConfigURLRetryAttempts)
if err != nil {
return fmt.Errorf("error loading config file %s: %w", path, err)
}
Expand Down Expand Up @@ -718,6 +722,10 @@ func trimBOM(f []byte) []byte {
// together with a flag denoting if the file is from a remote location such
// as a web server.
func LoadConfigFile(config string) ([]byte, bool, error) {
return LoadConfigFileWithRetries(config, 0)
}

func LoadConfigFileWithRetries(config string, urlRetryAttempts int) ([]byte, bool, error) {
if fetchURLRe.MatchString(config) {
u, err := url.Parse(config)
if err != nil {
Expand All @@ -726,7 +734,7 @@ func LoadConfigFile(config string) ([]byte, bool, error) {

switch u.Scheme {
case "https", "http":
data, err := fetchConfig(u)
data, err := fetchConfig(u, urlRetryAttempts)
return data, true, err
default:
return nil, true, fmt.Errorf("scheme %q not supported", u.Scheme)
Expand All @@ -747,7 +755,7 @@ func LoadConfigFile(config string) ([]byte, bool, error) {
return buffer, false, nil
}

func fetchConfig(u *url.URL) ([]byte, error) {
func fetchConfig(u *url.URL, urlRetryAttempts int) ([]byte, error) {
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
Expand All @@ -759,38 +767,53 @@ func fetchConfig(u *url.URL) ([]byte, error) {
req.Header.Add("Accept", "application/toml")
req.Header.Set("User-Agent", internal.ProductToken())

retries := 3
for i := 0; i <= retries; i++ {
body, err, retry := func() ([]byte, error, bool) {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("retry %d of %d failed connecting to HTTP config server: %w", i, retries, err), false
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
if i < retries {
log.Printf("Error getting HTTP config. Retry %d of %d in %s. Status=%d", i, retries, httpLoadConfigRetryInterval, resp.StatusCode)
return nil, nil, true
}
return nil, fmt.Errorf("retry %d of %d failed to retrieve remote config: %s", i, retries, resp.Status), false
}
body, err := io.ReadAll(resp.Body)
return body, err, false
}()
var totalAttempts int
if urlRetryAttempts == -1 {
totalAttempts = -1
log.Printf("Using unlimited number of attempts to fetch HTTP config")
} else if urlRetryAttempts == 0 {
totalAttempts = 3
log.Printf("Using default number of attempts to fetch HTTP config: %d", totalAttempts)
} else if urlRetryAttempts > 0 {
totalAttempts = urlRetryAttempts
} else {
return nil, fmt.Errorf("invalid number of attempts: %d", urlRetryAttempts)
}

if err != nil {
return nil, err
attempt := 0
for {
body, err := requestURLConfig(req)
if err == nil {
return body, nil
}

if retry {
time.Sleep(httpLoadConfigRetryInterval)
continue
log.Printf("Error getting HTTP config (attempt %d of %d): %s", attempt, totalAttempts, err)
if urlRetryAttempts != -1 && attempt >= totalAttempts {
return nil, err
}

return body, err
time.Sleep(httpLoadConfigRetryInterval)
attempt++
}
}

func requestURLConfig(req *http.Request) ([]byte, error) {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to connect to HTTP config server: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to fetch HTTP config: %s", resp.Status)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}

return nil, nil
return body, nil
}

// parseConfig loads a TOML configuration from a provided path and
Expand Down
2 changes: 1 addition & 1 deletion config/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func TestURLRetries3Fails(t *testing.T) {
}))
defer ts.Close()

expected := fmt.Sprintf("error loading config file %s: retry 3 of 3 failed to retrieve remote config: 404 Not Found", ts.URL)
expected := fmt.Sprintf("error loading config file %s: failed to fetch HTTP config: 404 Not Found", ts.URL)

c := NewConfig()
err := c.LoadConfig(ts.URL)
Expand Down

0 comments on commit ad59290

Please sign in to comment.