Skip to content

Commit

Permalink
MEDIUM: Add options to configure haproxy timeouts (#67)
Browse files Browse the repository at this point in the history
The options are available for both incomming requests and upstreams via
the "read_timeout" and "connect_timeout" options. Default values (60s
and 30s) are kept if the options are not set.
  • Loading branch information
ShimmerGlass authored Aug 31, 2020
1 parent 83f8b09 commit 97000a0
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 48 deletions.
16 changes: 11 additions & 5 deletions consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/x509"
"fmt"
"reflect"
"time"
)

type Config struct {
Expand All @@ -19,6 +20,8 @@ type Upstream struct {
LocalBindAddress string
LocalBindPort int
Protocol string
ConnectTimeout time.Duration
ReadTimeout time.Duration

TLS

Expand Down Expand Up @@ -46,11 +49,14 @@ func (n UpstreamNode) Equal(o UpstreamNode) bool {
}

type Downstream struct {
LocalBindAddress string
LocalBindPort int
Protocol string
TargetAddress string
TargetPort int
LocalBindAddress string
LocalBindPort int
Protocol string
TargetAddress string
TargetPort int
ConnectTimeout time.Duration
ReadTimeout time.Duration

EnableForwardFor bool
AppNameHeaderName string

Expand Down
80 changes: 63 additions & 17 deletions consul/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (

"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/connect/proxy"
log "github.com/sirupsen/logrus"
)

const (
defaultDownstreamBindAddr = "0.0.0.0"
defaultUpstreamBindAddr = "127.0.0.1"
DefaultDownstreamBindAddr = "0.0.0.0"
DefaultUpstreamBindAddr = "127.0.0.1"
DefaultReadTimeout = 60 * time.Second
DefaultConnectTimeout = 30 * time.Second

errorWaitTime = 5 * time.Second
preparedQueryPollInterval = 30 * time.Second
Expand All @@ -26,6 +29,8 @@ type upstream struct {
Datacenter string
Protocol string
Nodes []*api.ServiceEntry
ReadTimeout time.Duration
ConnectTimeout time.Duration

done bool
}
Expand All @@ -38,6 +43,8 @@ type downstream struct {
TargetPort int
EnableForwardFor bool
AppNameHeaderName string
ReadTimeout time.Duration
ConnectTimeout time.Duration
}

type certLeaf struct {
Expand Down Expand Up @@ -115,9 +122,11 @@ func (w *Watcher) Run() error {
}

func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) {
w.downstream.LocalBindAddress = defaultDownstreamBindAddr
w.downstream.LocalBindAddress = DefaultDownstreamBindAddr
w.downstream.LocalBindPort = srv.Port
w.downstream.TargetAddress = defaultUpstreamBindAddr
w.downstream.TargetAddress = DefaultUpstreamBindAddr
w.downstream.ReadTimeout = DefaultReadTimeout
w.downstream.ConnectTimeout = DefaultConnectTimeout

if srv.Proxy != nil && srv.Proxy.Config != nil {
if c, ok := srv.Proxy.Config["protocol"].(string); ok {
Expand All @@ -135,6 +144,22 @@ func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) {
if a, ok := srv.Proxy.Config["appname_header"].(string); ok {
w.downstream.AppNameHeaderName = a
}
if a, ok := srv.Proxy.Config["connect_timeout"].(string); ok {
to, err := time.ParseDuration(a)
if err != nil {
log.Errorf("bad connect_timeout value in config: %s. Using default: %s", err, DefaultConnectTimeout)
} else {
w.downstream.ConnectTimeout = to
}
}
if a, ok := srv.Proxy.Config["read_timeout"].(string); ok {
to, err := time.ParseDuration(a)
if err != nil {
log.Errorf("bad read_timeout value in config: %s. Using default: %s", err, DefaultReadTimeout)
} else {
w.downstream.ReadTimeout = to
}
}
}

keep := make(map[string]bool)
Expand Down Expand Up @@ -168,20 +193,46 @@ func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) {
}
}

func (w *Watcher) startUpstreamService(up api.Upstream, name string) {
w.log.Infof("consul: watching upstream for service %s", up.DestinationName)

func (w *Watcher) buildUpstream(up api.Upstream, name string) *upstream {
u := &upstream{
LocalBindAddress: up.LocalBindAddress,
LocalBindPort: up.LocalBindPort,
Name: name,
Datacenter: up.Datacenter,
ReadTimeout: DefaultReadTimeout,
ConnectTimeout: DefaultConnectTimeout,
}

if p, ok := up.Config["protocol"].(string); ok {
u.Protocol = p
}

if a, ok := up.Config["read_timeout"].(string); ok {
to, err := time.ParseDuration(a)
if err != nil {
log.Errorf("upstream %s: bad read_timeout value in config: %s. Using default: %s", name, err, DefaultReadTimeout)
} else {
u.ReadTimeout = to
}
}

if a, ok := up.Config["connect_timeout"].(string); ok {
to, err := time.ParseDuration(a)
if err != nil {
log.Errorf("upstream %s: bad connect_timeout value in config: %s. Using default: %s", name, err, DefaultConnectTimeout)
} else {
u.ConnectTimeout = to
}
}

return u
}

func (w *Watcher) startUpstreamService(up api.Upstream, name string) {
w.log.Infof("consul: watching upstream for service %s", up.DestinationName)

u := w.buildUpstream(up, name)

w.lock.Lock()
w.upstreams[name] = u
w.lock.Unlock()
Expand Down Expand Up @@ -219,16 +270,7 @@ func (w *Watcher) startUpstreamService(up api.Upstream, name string) {
func (w *Watcher) startUpstreamPreparedQuery(up api.Upstream, name string) {
w.log.Infof("consul: watching upstream for prepared_query %s", up.DestinationName)

u := &upstream{
LocalBindAddress: up.LocalBindAddress,
LocalBindPort: up.LocalBindPort,
Name: name,
Datacenter: up.Datacenter,
}

if p, ok := up.Config["protocol"].(string); ok {
u.Protocol = p
}
u := w.buildUpstream(up, name)

interval := preparedQueryPollInterval
if p, ok := up.Config["poll_interval"].(string); ok {
Expand Down Expand Up @@ -429,6 +471,8 @@ func (w *Watcher) genCfg() Config {
TargetAddress: w.downstream.TargetAddress,
TargetPort: w.downstream.TargetPort,
Protocol: w.downstream.Protocol,
ConnectTimeout: w.downstream.ConnectTimeout,
ReadTimeout: w.downstream.ReadTimeout,
EnableForwardFor: w.downstream.EnableForwardFor,
AppNameHeaderName: w.downstream.AppNameHeaderName,

Expand All @@ -446,6 +490,8 @@ func (w *Watcher) genCfg() Config {
LocalBindAddress: up.LocalBindAddress,
LocalBindPort: up.LocalBindPort,
Protocol: up.Protocol,
ConnectTimeout: up.ConnectTimeout,
ReadTimeout: up.ConnectTimeout,
TLS: TLS{
CAs: w.certCAs,
Cert: w.leaf.Cert,
Expand Down
6 changes: 3 additions & 3 deletions haproxy/state/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func generateDownstream(opts Options, certStore CertificateStore, cfg consul.Dow
Frontend: models.Frontend{
Name: feName,
DefaultBackend: beName,
ClientTimeout: &clientTimeout,
ClientTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())),
Mode: feMode,
Httplog: opts.LogRequests,
},
Expand Down Expand Up @@ -85,8 +85,8 @@ func generateDownstream(opts Options, certStore CertificateStore, cfg consul.Dow
be := Backend{
Backend: models.Backend{
Name: beName,
ServerTimeout: &serverTimeout,
ConnectTimeout: &connectTimeout,
ServerTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())),
ConnectTimeout: int64p(int(cfg.ConnectTimeout.Milliseconds())),
Mode: beMode,
Forwardfor: forwardFor,
},
Expand Down
20 changes: 12 additions & 8 deletions haproxy/state/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ func GetTestConsulConfig() consul.Config {
TargetAddress: "128.0.0.5",
TargetPort: 8888,
AppNameHeaderName: "X-App",
ConnectTimeout: consul.DefaultConnectTimeout,
ReadTimeout: consul.DefaultReadTimeout,
},
Upstreams: []consul.Upstream{
consul.Upstream{
Name: "service_1",
LocalBindAddress: "127.0.0.1",
LocalBindPort: 10000,
ConnectTimeout: consul.DefaultConnectTimeout,
ReadTimeout: consul.DefaultReadTimeout,
Nodes: []consul.UpstreamNode{
consul.UpstreamNode{
Host: "1.2.3.4",
Expand All @@ -50,7 +54,7 @@ func GetTestHAConfig(baseCfg string, certVersion string) State {
Frontend: models.Frontend{
Name: "front_downstream",
DefaultBackend: "back_downstream",
ClientTimeout: &clientTimeout,
ClientTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())),
Mode: models.FrontendModeHTTP,
Httplog: true,
},
Expand Down Expand Up @@ -91,7 +95,7 @@ func GetTestHAConfig(baseCfg string, certVersion string) State {
Frontend: models.Frontend{
Name: "front_service_1",
DefaultBackend: "back_service_1",
ClientTimeout: &clientTimeout,
ClientTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())),
Mode: models.FrontendModeHTTP,
Httplog: true,
},
Expand All @@ -115,8 +119,8 @@ func GetTestHAConfig(baseCfg string, certVersion string) State {
Backend{
Backend: models.Backend{
Name: "back_downstream",
ServerTimeout: &serverTimeout,
ConnectTimeout: &connectTimeout,
ServerTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())),
ConnectTimeout: int64p(int(consul.DefaultConnectTimeout.Milliseconds())),
Mode: models.BackendModeHTTP,
},
Servers: []models.Server{
Expand Down Expand Up @@ -146,8 +150,8 @@ func GetTestHAConfig(baseCfg string, certVersion string) State {
Backend{
Backend: models.Backend{
Name: "back_service_1",
ServerTimeout: &serverTimeout,
ConnectTimeout: &connectTimeout,
ServerTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())),
ConnectTimeout: int64p(int(consul.DefaultConnectTimeout.Milliseconds())),
Mode: models.BackendModeHTTP,
Balance: &models.Balance{
Algorithm: models.BalanceAlgorithmLeastconn,
Expand Down Expand Up @@ -189,8 +193,8 @@ func GetTestHAConfig(baseCfg string, certVersion string) State {
Backend{
Backend: models.Backend{
Name: "spoe_back",
ServerTimeout: &spoeTimeout,
ConnectTimeout: &spoeTimeout,
ServerTimeout: int64p(int(spoeTimeout.Milliseconds())),
ConnectTimeout: int64p(int(spoeTimeout.Milliseconds())),
Mode: models.BackendModeTCP,
},
Servers: []models.Server{
Expand Down
9 changes: 7 additions & 2 deletions haproxy/state/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ package state
import (
"fmt"
"sort"
"time"

"github.com/haproxytech/haproxy-consul-connect/consul"
"github.com/haproxytech/models"
)

const (
spoeTimeout = 30 * time.Second
)

type Options struct {
EnableIntentions bool
LogRequests bool
Expand Down Expand Up @@ -44,8 +49,8 @@ func Generate(opts Options, certStore CertificateStore, oldState State, cfg cons
newState.Backends = append(newState.Backends, Backend{
Backend: models.Backend{
Name: "spoe_back",
ServerTimeout: int64p(30000),
ConnectTimeout: int64p(30000),
ServerTimeout: int64p(int(spoeTimeout.Milliseconds())),
ConnectTimeout: int64p(int(spoeTimeout.Milliseconds())),
Mode: models.BackendModeTCP,
},
Servers: []models.Server{
Expand Down
10 changes: 0 additions & 10 deletions haproxy/state/timeouts.go

This file was deleted.

6 changes: 3 additions & 3 deletions haproxy/state/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func generateUpstream(opts Options, certStore CertificateStore, cfg consul.Upstr
Frontend: models.Frontend{
Name: feName,
DefaultBackend: beName,
ClientTimeout: &clientTimeout,
ClientTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())),
Mode: feMode,
Httplog: opts.LogRequests,
},
Expand All @@ -48,8 +48,8 @@ func generateUpstream(opts Options, certStore CertificateStore, cfg consul.Upstr
be := Backend{
Backend: models.Backend{
Name: beName,
ServerTimeout: &serverTimeout,
ConnectTimeout: &connectTimeout,
ServerTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())),
ConnectTimeout: int64p(int(cfg.ConnectTimeout.Milliseconds())),
Balance: &models.Balance{
Algorithm: models.BalanceAlgorithmLeastconn,
},
Expand Down

0 comments on commit 97000a0

Please sign in to comment.