From 97000a0ccad02b062cf603a251f1e598825d457f Mon Sep 17 00:00:00 2001 From: Mathilde Gilles Date: Mon, 31 Aug 2020 15:48:30 +0200 Subject: [PATCH] MEDIUM: Add options to configure haproxy timeouts (#67) The options are available for both incomming requests and upstreams via the "read_timeout" and "connect_timeout" options. Default values (60s and 30s) are kept if the options are not set. --- consul/config.go | 16 ++++--- consul/watcher.go | 80 ++++++++++++++++++++++++++-------- haproxy/state/downstream.go | 6 +-- haproxy/state/snapshot_test.go | 20 +++++---- haproxy/state/states.go | 9 +++- haproxy/state/timeouts.go | 10 ----- haproxy/state/upstream.go | 6 +-- 7 files changed, 99 insertions(+), 48 deletions(-) delete mode 100644 haproxy/state/timeouts.go diff --git a/consul/config.go b/consul/config.go index d504f4c..17e87a5 100644 --- a/consul/config.go +++ b/consul/config.go @@ -4,6 +4,7 @@ import ( "crypto/x509" "fmt" "reflect" + "time" ) type Config struct { @@ -19,6 +20,8 @@ type Upstream struct { LocalBindAddress string LocalBindPort int Protocol string + ConnectTimeout time.Duration + ReadTimeout time.Duration TLS @@ -46,11 +49,14 @@ func (n UpstreamNode) Equal(o UpstreamNode) bool { } type Downstream struct { - LocalBindAddress string - LocalBindPort int - Protocol string - TargetAddress string - TargetPort int + LocalBindAddress string + LocalBindPort int + Protocol string + TargetAddress string + TargetPort int + ConnectTimeout time.Duration + ReadTimeout time.Duration + EnableForwardFor bool AppNameHeaderName string diff --git a/consul/watcher.go b/consul/watcher.go index 6211007..20f99d4 100644 --- a/consul/watcher.go +++ b/consul/watcher.go @@ -9,11 +9,14 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/command/connect/proxy" + log "github.com/sirupsen/logrus" ) const ( - defaultDownstreamBindAddr = "0.0.0.0" - defaultUpstreamBindAddr = "127.0.0.1" + DefaultDownstreamBindAddr = "0.0.0.0" + DefaultUpstreamBindAddr = "127.0.0.1" + DefaultReadTimeout = 60 * time.Second + DefaultConnectTimeout = 30 * time.Second errorWaitTime = 5 * time.Second preparedQueryPollInterval = 30 * time.Second @@ -26,6 +29,8 @@ type upstream struct { Datacenter string Protocol string Nodes []*api.ServiceEntry + ReadTimeout time.Duration + ConnectTimeout time.Duration done bool } @@ -38,6 +43,8 @@ type downstream struct { TargetPort int EnableForwardFor bool AppNameHeaderName string + ReadTimeout time.Duration + ConnectTimeout time.Duration } type certLeaf struct { @@ -115,9 +122,11 @@ func (w *Watcher) Run() error { } func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) { - w.downstream.LocalBindAddress = defaultDownstreamBindAddr + w.downstream.LocalBindAddress = DefaultDownstreamBindAddr w.downstream.LocalBindPort = srv.Port - w.downstream.TargetAddress = defaultUpstreamBindAddr + w.downstream.TargetAddress = DefaultUpstreamBindAddr + w.downstream.ReadTimeout = DefaultReadTimeout + w.downstream.ConnectTimeout = DefaultConnectTimeout if srv.Proxy != nil && srv.Proxy.Config != nil { if c, ok := srv.Proxy.Config["protocol"].(string); ok { @@ -135,6 +144,22 @@ func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) { if a, ok := srv.Proxy.Config["appname_header"].(string); ok { w.downstream.AppNameHeaderName = a } + if a, ok := srv.Proxy.Config["connect_timeout"].(string); ok { + to, err := time.ParseDuration(a) + if err != nil { + log.Errorf("bad connect_timeout value in config: %s. Using default: %s", err, DefaultConnectTimeout) + } else { + w.downstream.ConnectTimeout = to + } + } + if a, ok := srv.Proxy.Config["read_timeout"].(string); ok { + to, err := time.ParseDuration(a) + if err != nil { + log.Errorf("bad read_timeout value in config: %s. Using default: %s", err, DefaultReadTimeout) + } else { + w.downstream.ReadTimeout = to + } + } } keep := make(map[string]bool) @@ -168,20 +193,46 @@ func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) { } } -func (w *Watcher) startUpstreamService(up api.Upstream, name string) { - w.log.Infof("consul: watching upstream for service %s", up.DestinationName) - +func (w *Watcher) buildUpstream(up api.Upstream, name string) *upstream { u := &upstream{ LocalBindAddress: up.LocalBindAddress, LocalBindPort: up.LocalBindPort, Name: name, Datacenter: up.Datacenter, + ReadTimeout: DefaultReadTimeout, + ConnectTimeout: DefaultConnectTimeout, } if p, ok := up.Config["protocol"].(string); ok { u.Protocol = p } + if a, ok := up.Config["read_timeout"].(string); ok { + to, err := time.ParseDuration(a) + if err != nil { + log.Errorf("upstream %s: bad read_timeout value in config: %s. Using default: %s", name, err, DefaultReadTimeout) + } else { + u.ReadTimeout = to + } + } + + if a, ok := up.Config["connect_timeout"].(string); ok { + to, err := time.ParseDuration(a) + if err != nil { + log.Errorf("upstream %s: bad connect_timeout value in config: %s. Using default: %s", name, err, DefaultConnectTimeout) + } else { + u.ConnectTimeout = to + } + } + + return u +} + +func (w *Watcher) startUpstreamService(up api.Upstream, name string) { + w.log.Infof("consul: watching upstream for service %s", up.DestinationName) + + u := w.buildUpstream(up, name) + w.lock.Lock() w.upstreams[name] = u w.lock.Unlock() @@ -219,16 +270,7 @@ func (w *Watcher) startUpstreamService(up api.Upstream, name string) { func (w *Watcher) startUpstreamPreparedQuery(up api.Upstream, name string) { w.log.Infof("consul: watching upstream for prepared_query %s", up.DestinationName) - u := &upstream{ - LocalBindAddress: up.LocalBindAddress, - LocalBindPort: up.LocalBindPort, - Name: name, - Datacenter: up.Datacenter, - } - - if p, ok := up.Config["protocol"].(string); ok { - u.Protocol = p - } + u := w.buildUpstream(up, name) interval := preparedQueryPollInterval if p, ok := up.Config["poll_interval"].(string); ok { @@ -429,6 +471,8 @@ func (w *Watcher) genCfg() Config { TargetAddress: w.downstream.TargetAddress, TargetPort: w.downstream.TargetPort, Protocol: w.downstream.Protocol, + ConnectTimeout: w.downstream.ConnectTimeout, + ReadTimeout: w.downstream.ReadTimeout, EnableForwardFor: w.downstream.EnableForwardFor, AppNameHeaderName: w.downstream.AppNameHeaderName, @@ -446,6 +490,8 @@ func (w *Watcher) genCfg() Config { LocalBindAddress: up.LocalBindAddress, LocalBindPort: up.LocalBindPort, Protocol: up.Protocol, + ConnectTimeout: up.ConnectTimeout, + ReadTimeout: up.ConnectTimeout, TLS: TLS{ CAs: w.certCAs, Cert: w.leaf.Cert, diff --git a/haproxy/state/downstream.go b/haproxy/state/downstream.go index be9d41e..9b5007f 100644 --- a/haproxy/state/downstream.go +++ b/haproxy/state/downstream.go @@ -28,7 +28,7 @@ func generateDownstream(opts Options, certStore CertificateStore, cfg consul.Dow Frontend: models.Frontend{ Name: feName, DefaultBackend: beName, - ClientTimeout: &clientTimeout, + ClientTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())), Mode: feMode, Httplog: opts.LogRequests, }, @@ -85,8 +85,8 @@ func generateDownstream(opts Options, certStore CertificateStore, cfg consul.Dow be := Backend{ Backend: models.Backend{ Name: beName, - ServerTimeout: &serverTimeout, - ConnectTimeout: &connectTimeout, + ServerTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())), + ConnectTimeout: int64p(int(cfg.ConnectTimeout.Milliseconds())), Mode: beMode, Forwardfor: forwardFor, }, diff --git a/haproxy/state/snapshot_test.go b/haproxy/state/snapshot_test.go index eda74bf..2774025 100644 --- a/haproxy/state/snapshot_test.go +++ b/haproxy/state/snapshot_test.go @@ -18,12 +18,16 @@ func GetTestConsulConfig() consul.Config { TargetAddress: "128.0.0.5", TargetPort: 8888, AppNameHeaderName: "X-App", + ConnectTimeout: consul.DefaultConnectTimeout, + ReadTimeout: consul.DefaultReadTimeout, }, Upstreams: []consul.Upstream{ consul.Upstream{ Name: "service_1", LocalBindAddress: "127.0.0.1", LocalBindPort: 10000, + ConnectTimeout: consul.DefaultConnectTimeout, + ReadTimeout: consul.DefaultReadTimeout, Nodes: []consul.UpstreamNode{ consul.UpstreamNode{ Host: "1.2.3.4", @@ -50,7 +54,7 @@ func GetTestHAConfig(baseCfg string, certVersion string) State { Frontend: models.Frontend{ Name: "front_downstream", DefaultBackend: "back_downstream", - ClientTimeout: &clientTimeout, + ClientTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())), Mode: models.FrontendModeHTTP, Httplog: true, }, @@ -91,7 +95,7 @@ func GetTestHAConfig(baseCfg string, certVersion string) State { Frontend: models.Frontend{ Name: "front_service_1", DefaultBackend: "back_service_1", - ClientTimeout: &clientTimeout, + ClientTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())), Mode: models.FrontendModeHTTP, Httplog: true, }, @@ -115,8 +119,8 @@ func GetTestHAConfig(baseCfg string, certVersion string) State { Backend{ Backend: models.Backend{ Name: "back_downstream", - ServerTimeout: &serverTimeout, - ConnectTimeout: &connectTimeout, + ServerTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())), + ConnectTimeout: int64p(int(consul.DefaultConnectTimeout.Milliseconds())), Mode: models.BackendModeHTTP, }, Servers: []models.Server{ @@ -146,8 +150,8 @@ func GetTestHAConfig(baseCfg string, certVersion string) State { Backend{ Backend: models.Backend{ Name: "back_service_1", - ServerTimeout: &serverTimeout, - ConnectTimeout: &connectTimeout, + ServerTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())), + ConnectTimeout: int64p(int(consul.DefaultConnectTimeout.Milliseconds())), Mode: models.BackendModeHTTP, Balance: &models.Balance{ Algorithm: models.BalanceAlgorithmLeastconn, @@ -189,8 +193,8 @@ func GetTestHAConfig(baseCfg string, certVersion string) State { Backend{ Backend: models.Backend{ Name: "spoe_back", - ServerTimeout: &spoeTimeout, - ConnectTimeout: &spoeTimeout, + ServerTimeout: int64p(int(spoeTimeout.Milliseconds())), + ConnectTimeout: int64p(int(spoeTimeout.Milliseconds())), Mode: models.BackendModeTCP, }, Servers: []models.Server{ diff --git a/haproxy/state/states.go b/haproxy/state/states.go index 844b254..238c0ca 100644 --- a/haproxy/state/states.go +++ b/haproxy/state/states.go @@ -3,11 +3,16 @@ package state import ( "fmt" "sort" + "time" "github.com/haproxytech/haproxy-consul-connect/consul" "github.com/haproxytech/models" ) +const ( + spoeTimeout = 30 * time.Second +) + type Options struct { EnableIntentions bool LogRequests bool @@ -44,8 +49,8 @@ func Generate(opts Options, certStore CertificateStore, oldState State, cfg cons newState.Backends = append(newState.Backends, Backend{ Backend: models.Backend{ Name: "spoe_back", - ServerTimeout: int64p(30000), - ConnectTimeout: int64p(30000), + ServerTimeout: int64p(int(spoeTimeout.Milliseconds())), + ConnectTimeout: int64p(int(spoeTimeout.Milliseconds())), Mode: models.BackendModeTCP, }, Servers: []models.Server{ diff --git a/haproxy/state/timeouts.go b/haproxy/state/timeouts.go deleted file mode 100644 index 6489508..0000000 --- a/haproxy/state/timeouts.go +++ /dev/null @@ -1,10 +0,0 @@ -package state - -import "time" - -var ( - connectTimeout = int64(time.Second.Seconds() * 1000) - clientTimeout = int64((30 * time.Second).Seconds() * 1000) - serverTimeout = int64((60 * time.Second).Seconds() * 1000) - spoeTimeout = int64((30 * time.Second).Seconds() * 1000) -) diff --git a/haproxy/state/upstream.go b/haproxy/state/upstream.go index 6a43491..288a07f 100644 --- a/haproxy/state/upstream.go +++ b/haproxy/state/upstream.go @@ -24,7 +24,7 @@ func generateUpstream(opts Options, certStore CertificateStore, cfg consul.Upstr Frontend: models.Frontend{ Name: feName, DefaultBackend: beName, - ClientTimeout: &clientTimeout, + ClientTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())), Mode: feMode, Httplog: opts.LogRequests, }, @@ -48,8 +48,8 @@ func generateUpstream(opts Options, certStore CertificateStore, cfg consul.Upstr be := Backend{ Backend: models.Backend{ Name: beName, - ServerTimeout: &serverTimeout, - ConnectTimeout: &connectTimeout, + ServerTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())), + ConnectTimeout: int64p(int(cfg.ConnectTimeout.Milliseconds())), Balance: &models.Balance{ Algorithm: models.BalanceAlgorithmLeastconn, },