Skip to content

Commit

Permalink
Merge pull request #8696 from hashicorp/dnephin/fix-load-limits
Browse files Browse the repository at this point in the history
agent/consul: make Client/Server config reloading more obvious
  • Loading branch information
dnephin authored Jan 14, 2021
2 parents 964ab23 + e66af1a commit cf4fa18
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 85 deletions.
4 changes: 4 additions & 0 deletions .changelog/8696.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:bug
config: Fixed a bug where `rpc_max_conns_per_client` could not be changed by reloading the
config.
```
2 changes: 1 addition & 1 deletion agent/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (a *TestACLAgent) Shutdown() error {
func (a *TestACLAgent) Stats() map[string]map[string]string {
return nil
}
func (a *TestACLAgent) ReloadConfig(config *consul.Config) error {
func (a *TestACLAgent) ReloadConfig(_ consul.ReloadableConfig) error {
return fmt.Errorf("Unimplemented")
}

Expand Down
29 changes: 8 additions & 21 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type delegate interface {
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
Shutdown() error
Stats() map[string]map[string]string
ReloadConfig(config *consul.Config) error
ReloadConfig(config consul.ReloadableConfig) error
enterpriseDelegate
}

Expand Down Expand Up @@ -1143,7 +1143,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co

// Rate limiting for RPC calls.
if runtimeCfg.RPCRateLimit > 0 {
cfg.RPCRate = runtimeCfg.RPCRateLimit
cfg.RPCRateLimit = runtimeCfg.RPCRateLimit
}
if runtimeCfg.RPCMaxBurst > 0 {
cfg.RPCMaxBurst = runtimeCfg.RPCMaxBurst
Expand Down Expand Up @@ -3517,11 +3517,6 @@ func (a *Agent) DisableNodeMaintenance() {
a.logger.Info("Node left maintenance mode")
}

func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
a.config.RPCRateLimit = conf.RPCRateLimit
a.config.RPCMaxBurst = conf.RPCMaxBurst
}

// ReloadConfig will atomically reload all configuration, including
// all services, checks, tokens, metadata, dnsServer configs, etc.
// It will also reload all ongoing watches.
Expand Down Expand Up @@ -3602,8 +3597,6 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
return fmt.Errorf("Failed reloading watches: %v", err)
}

a.loadLimits(newCfg)

a.httpConnLimiter.SetConfig(connlimit.Config{
MaxConnsPerClientIP: newCfg.HTTPMaxConnsPerClient,
})
Expand All @@ -3614,24 +3607,18 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
}
}

// this only gets used by the consulConfig function and since
// that is only ever done during init and reload here then
// an in place modification is safe as reloads cannot be
// concurrent due to both gaining a full lock on the stateLock
a.config.ConfigEntryBootstrap = newCfg.ConfigEntryBootstrap

err := a.reloadEnterprise(newCfg)
if err != nil {
return err
}

// create the config for the rpc server/client
consulCfg, err := newConsulConfig(a.config, a.logger)
if err != nil {
return err
cc := consul.ReloadableConfig{
RPCRateLimit: newCfg.RPCRateLimit,
RPCMaxBurst: newCfg.RPCMaxBurst,
RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient,
ConfigEntryBootstrap: newCfg.ConfigEntryBootstrap,
}

if err := a.delegate.ReloadConfig(consulCfg); err != nil {
if err := a.delegate.ReloadConfig(cc); err != nil {
return err
}

Expand Down
23 changes: 16 additions & 7 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/debug"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)

func makeReadOnlyAgentACL(t *testing.T, srv *HTTPHandlers) string {
Expand Down Expand Up @@ -1452,20 +1454,17 @@ func TestAgent_Reload(t *testing.T) {
`,
})

shim := &delegateConfigReloadShim{delegate: a.delegate}
a.delegate = shim
if err := a.reloadConfigInternal(cfg2); err != nil {
t.Fatalf("got error %v want nil", err)
}
if a.State.Service(structs.NewServiceID("redis-reloaded", nil)) == nil {
t.Fatal("missing redis-reloaded service")
}

if a.config.RPCRateLimit != 2 {
t.Fatalf("RPC rate not set correctly. Got %v. Want 2", a.config.RPCRateLimit)
}

if a.config.RPCMaxBurst != 200 {
t.Fatalf("RPC max burst not set correctly. Got %v. Want 200", a.config.RPCMaxBurst)
}
require.Equal(t, rate.Limit(2), shim.newCfg.RPCRateLimit)
require.Equal(t, 200, shim.newCfg.RPCMaxBurst)

for _, wp := range a.watchPlans {
if !wp.IsStopped() {
Expand All @@ -1474,6 +1473,16 @@ func TestAgent_Reload(t *testing.T) {
}
}

type delegateConfigReloadShim struct {
delegate
newCfg consul.ReloadableConfig
}

func (s *delegateConfigReloadShim) ReloadConfig(cfg consul.ReloadableConfig) error {
s.newCfg = cfg
return s.delegate.ReloadConfig(cfg)
}

// TestAgent_ReloadDoesNotTriggerWatch Ensure watches not triggered after reload
// see https://github.com/hashicorp/consul/issues/7446
func TestAgent_ReloadDoesNotTriggerWatch(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,8 +921,8 @@ type RuntimeConfig struct {

// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
// to happen. In any large enough time interval, rate limiter limits the
// rate to RPCRate tokens per second, with a maximum burst size of
// RPCMaxBurst events. As a special case, if RPCRate == Inf (the infinite
// rate to RPCRateLimit tokens per second, with a maximum burst size of
// RPCMaxBurst events. As a special case, if RPCRateLimit == Inf (the infinite
// rate), RPCMaxBurst is ignored.
//
// See https://en.wikipedia.org/wiki/Token_bucket for more about token
Expand Down
6 changes: 3 additions & 3 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewClient(config *Config, deps Deps) (*Client, error) {
tlsConfigurator: deps.TLSConfigurator,
}

c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))

if err := c.initEnterprise(); err != nil {
c.Shutdown()
Expand Down Expand Up @@ -403,7 +403,7 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {

// ReloadConfig is used to have the Client do an online reload of
// relevant configuration information
func (c *Client) ReloadConfig(config *Config) error {
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
func (c *Client) ReloadConfig(config ReloadableConfig) error {
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
return nil
}
39 changes: 21 additions & 18 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
"testing"
"time"

"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
Expand All @@ -18,11 +24,6 @@ import (
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)

func testClientConfig(t *testing.T) (string, *Config) {
Expand Down Expand Up @@ -532,7 +533,7 @@ func TestClient_RPC_RateLimit(t *testing.T) {
testrpc.WaitForLeader(t, s1.RPC, "dc1")

_, conf2 := testClientConfig(t)
conf2.RPCRate = 2
conf2.RPCRateLimit = 2
conf2.RPCMaxBurst = 2
c1 := newClient(t, conf2)

Expand Down Expand Up @@ -601,7 +602,7 @@ func TestClient_SnapshotRPC_RateLimit(t *testing.T) {
testrpc.WaitForLeader(t, s1.RPC, "dc1")

_, conf1 := testClientConfig(t)
conf1.RPCRate = 2
conf1.RPCRateLimit = 2
conf1.RPCMaxBurst = 2
c1 := newClient(t, conf1)

Expand Down Expand Up @@ -762,23 +763,25 @@ func TestClientServer_UserEvent(t *testing.T) {
}
}

func TestClient_Reload(t *testing.T) {
t.Parallel()
dir1, c := testClientWithConfig(t, func(c *Config) {
c.RPCRate = 500
c.RPCMaxBurst = 5000
})
defer os.RemoveAll(dir1)
defer c.Shutdown()
func TestClient_ReloadConfig(t *testing.T) {
_, cfg := testClientConfig(t)
cfg.RPCRateLimit = rate.Limit(500)
cfg.RPCMaxBurst = 5000
deps := newDefaultDeps(t, &Config{NodeName: "node1", Datacenter: "dc1"})
c, err := NewClient(cfg, deps)
require.NoError(t, err)

limiter := c.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(500), limiter.Limit())
require.Equal(t, 5000, limiter.Burst())

c.config.RPCRate = 1000
c.config.RPCMaxBurst = 10000
rc := ReloadableConfig{
RPCRateLimit: 1000,
RPCMaxBurst: 10000,
RPCMaxConnsPerClient: 0,
}
require.NoError(t, c.ReloadConfig(rc))

require.NoError(t, c.ReloadConfig(c.config))
limiter = c.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(1000), limiter.Limit())
require.Equal(t, 10000, limiter.Burst())
Expand Down
23 changes: 16 additions & 7 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,16 +416,16 @@ type Config struct {
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration

// RPCRate and RPCMaxBurst control how frequently RPC calls are allowed
// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
// to happen. In any large enough time interval, rate limiter limits the
// rate to RPCRate tokens per second, with a maximum burst size of
// RPCMaxBurst events. As a special case, if RPCRate == Inf (the infinite
// rate to RPCRateLimit tokens per second, with a maximum burst size of
// RPCMaxBurst events. As a special case, if RPCRateLimit == Inf (the infinite
// rate), RPCMaxBurst is ignored.
//
// See https://en.wikipedia.org/wiki/Token_bucket for more about token
// buckets.
RPCRate rate.Limit
RPCMaxBurst int
RPCRateLimit rate.Limit
RPCMaxBurst int

// RPCMaxConnsPerClient is the limit of how many concurrent connections are
// allowed from a single source IP.
Expand Down Expand Up @@ -582,8 +582,8 @@ func DefaultConfig() *Config {

CheckOutputMaxSize: checks.DefaultBufSize,

RPCRate: rate.Inf,
RPCMaxBurst: 1000,
RPCRateLimit: rate.Inf,
RPCMaxBurst: 1000,

TLSMinVersion: "tls10",

Expand Down Expand Up @@ -655,3 +655,12 @@ func DefaultConfig() *Config {
type RPCConfig struct {
EnableStreaming bool
}

// ReloadableConfig is the configuration that is passed to ReloadConfig when
// application config is reloaded.
type ReloadableConfig struct {
RPCRateLimit rate.Limit
RPCMaxBurst int
RPCMaxConnsPerClient int
ConfigEntryBootstrap []structs.ConfigEntry
}
9 changes: 6 additions & 3 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,9 +729,12 @@ func TestRPC_RPCMaxConnsPerClient(t *testing.T) {
defer conn4.Close()

// Reload config with higher limit
newCfg := *s1.config
newCfg.RPCMaxConnsPerClient = 10
require.NoError(t, s1.ReloadConfig(&newCfg))
rc := ReloadableConfig{
RPCRateLimit: s1.config.RPCRateLimit,
RPCMaxBurst: s1.config.RPCMaxBurst,
RPCMaxConnsPerClient: 10,
}
require.NoError(t, s1.ReloadConfig(rc))

// Now another conn should be allowed
conn5 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn5")
Expand Down
6 changes: 3 additions & 3 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
return nil, err
}

s.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))

configReplicatorConfig := ReplicatorConfig{
Name: logging.ConfigEntry,
Expand Down Expand Up @@ -1384,8 +1384,8 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {

// ReloadConfig is used to have the Server do an online reload of
// relevant configuration information
func (s *Server) ReloadConfig(config *Config) error {
s.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
func (s *Server) ReloadConfig(config ReloadableConfig) error {
s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
s.rpcConnLimiter.SetConfig(connlimit.Config{
MaxConnsPerClientIP: config.RPCMaxConnsPerClient,
})
Expand Down
Loading

0 comments on commit cf4fa18

Please sign in to comment.