Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a way to override default consistency level per agent #3920

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ca9c924
Added a way to override default consistency level per agent
pierresouchay Feb 26, 2018
ac6b75f
Allow to have a specific Consistency level for discovery operations
pierresouchay Mar 1, 2018
523c3ff
Added missing fixes in unit tests
pierresouchay Mar 1, 2018
2fd21a3
Added Unit tests
pierresouchay Mar 2, 2018
52a7ca8
Output from test as we run to stop travis killing us and try no paral…
banks Feb 20, 2018
dcd3bf5
Only output sparse lines to keep Travis happy while logging verbosely…
banks Feb 20, 2018
726a231
Portability!
banks Feb 20, 2018
aaec358
try enabling sudo in Travis to run builds in GCE
pearkes Feb 20, 2018
dc4b0e0
Try parallel packages but not tests
banks Feb 21, 2018
49102a7
Split the heavy test packages out to their own Jobs.
banks Feb 21, 2018
1d9e37e
Use relative paths as Travis doesn't setup GOPATH right
banks Feb 21, 2018
b98655e
Travis evaluates ENV before cloning git repo and cding so we need to …
banks Feb 21, 2018
b666761
Fix test running in non-bash shells
banks Feb 22, 2018
937d7f4
remove old pkgs and put deps of missing packages in vendor.json
alvin-huang Feb 23, 2018
843521f
website: override automatic linking of list items for softlayer dc
pearkes Feb 27, 2018
c1dd2d4
Update CHANGELOG.md
banks Feb 28, 2018
5ba6d78
Fixed Unit test TestSanitize
pierresouchay Mar 2, 2018
84e4f6a
Added explicit default value in comment
pierresouchay Mar 2, 2018
93e01dd
Added more unit tests
pierresouchay Mar 2, 2018
0b0e0d4
Now use the same mechanism as in DNS with max_stale
pierresouchay Mar 6, 2018
f3cd6b3
Merge remote-tracking branch 'origin/master' into default_http_consis…
pierresouchay Mar 8, 2018
6a1cacd
Added X-Consul-Effective-Consistency-Level to discovery requests
pierresouchay Mar 27, 2018
ab629ab
Removed unused isAcceptableConsistency func used in previous patch
pierresouchay Mar 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions agent/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,17 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (

var out structs.IndexedNodes
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

s.agent.TranslateAddresses(args.Datacenter, out.Nodes)

// Use empty list instead of nil
Expand Down Expand Up @@ -142,11 +150,18 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request

var out structs.IndexedServices
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

// Use empty map instead of nil
if out.Services == nil {
Expand Down Expand Up @@ -190,11 +205,18 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedServiceNodes
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_service_nodes"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
s.agent.TranslateAddresses(args.Datacenter, out.ServiceNodes)

// Use empty list instead of nil
Expand Down Expand Up @@ -237,11 +259,18 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedNodeServices
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_node_services"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
if out.NodeServices != nil && out.NodeServices.Node != nil {
s.agent.TranslateAddresses(args.Datacenter, out.NodeServices.Node)
}
Expand Down
1 change: 1 addition & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
DisableRemoteExec: b.boolVal(c.DisableRemoteExec),
DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck),
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
EnableDebug: b.boolVal(c.EnableDebug),
EnableScriptChecks: b.boolVal(c.EnableScriptChecks),
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type Config struct {
DisableRemoteExec *bool `json:"disable_remote_exec,omitempty" hcl:"disable_remote_exec" mapstructure:"disable_remote_exec"`
DisableUpdateCheck *bool `json:"disable_update_check,omitempty" hcl:"disable_update_check" mapstructure:"disable_update_check"`
DiscardCheckOutput *bool `json:"discard_check_output" hcl:"discard_check_output" mapstructure:"discard_check_output"`
DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"`
EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"`
EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"`
EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"`
Expand Down
8 changes: 8 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,14 @@ type RuntimeConfig struct {
// flag: -datacenter string
Datacenter string

// Defines the maximum stale value for discovery path. Defauls to "0s".
// Discovery paths are /v1/heath/ paths
//
// If not set to 0, it will try to perform stale read and perform only a
// consistent read whenever the value is too old.
// hcl: discovery_max_stale = "duration"
DiscoveryMaxStale time.Duration

// Node name is the name we use to advertise. Defaults to hostname.
//
// NodeName is exposed via /v1/agent/self from here and
Expand Down
6 changes: 6 additions & 0 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2285,6 +2285,7 @@ func TestFullConfig(t *testing.T) {
"disable_remote_exec": true,
"disable_update_check": true,
"discard_check_output": true,
"discovery_max_stale": "5s",
"domain": "7W1xXSqd",
"dns_config": {
"allow_stale": true,
Expand Down Expand Up @@ -2720,6 +2721,7 @@ func TestFullConfig(t *testing.T) {
disable_remote_exec = true
disable_update_check = true
discard_check_output = true
discovery_max_stale = "5s"
domain = "7W1xXSqd"
dns_config {
allow_stale = true
Expand Down Expand Up @@ -3046,6 +3048,7 @@ func TestFullConfig(t *testing.T) {
"ae_interval": "10003s",
"check_deregister_interval_min": "27870s",
"check_reap_interval": "10662s",
"discovery_max_stale": "5s",
"segment_limit": 24705,
"segment_name_limit": 27046,
"sync_coordinate_interval_min": "27983s",
Expand Down Expand Up @@ -3100,6 +3103,7 @@ func TestFullConfig(t *testing.T) {
ae_interval = "10003s"
check_deregister_interval_min = "27870s"
check_reap_interval = "10662s"
discovery_max_stale = "5s"
segment_limit = 24705
segment_name_limit = 27046
sync_coordinate_interval_min = "27983s"
Expand Down Expand Up @@ -3305,6 +3309,7 @@ func TestFullConfig(t *testing.T) {
DisableRemoteExec: true,
DisableUpdateCheck: true,
DiscardCheckOutput: true,
DiscoveryMaxStale: 5 * time.Second,
EnableACLReplication: true,
EnableAgentTLSForChecks: true,
EnableDebug: true,
Expand Down Expand Up @@ -3985,6 +3990,7 @@ func TestSanitize(t *testing.T) {
"DisableRemoteExec": false,
"DisableUpdateCheck": false,
"DiscardCheckOutput": false,
"DiscoveryMaxStale": "0s",
"EnableACLReplication": false,
"EnableAgentTLSForChecks": false,
"EnableDebug": false,
Expand Down
28 changes: 28 additions & 0 deletions agent/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,16 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

// Use empty list instead of nil
if out.HealthChecks == nil {
Expand Down Expand Up @@ -74,9 +81,16 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques
// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

// Use empty list instead of nil
if out.HealthChecks == nil {
Expand Down Expand Up @@ -116,9 +130,16 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

// Use empty list instead of nil
if out.HealthChecks == nil {
Expand Down Expand Up @@ -165,9 +186,16 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
// Make the RPC request
var out structs.IndexedCheckServiceNodes
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

// Filter to only passing if specified
if _, ok := params[api.HealthPassing]; ok {
Expand Down
40 changes: 38 additions & 2 deletions agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,12 @@ func setKnownLeader(resp http.ResponseWriter, known bool) {
resp.Header().Set("X-Consul-KnownLeader", s)
}

func setConsistency(resp http.ResponseWriter, consistency string) {
if consistency != "" {
resp.Header().Set("X-Consul-Effective-Consistency-Level", consistency)
}
}

// setLastContact is used to set the last contact header
func setLastContact(resp http.ResponseWriter, last time.Duration) {
if last < 0 {
Expand All @@ -347,6 +353,7 @@ func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) {
setIndex(resp, m.Index)
setLastContact(resp, m.LastContact)
setKnownLeader(resp, m.KnownLeader)
setConsistency(resp, m.ConsistencyLevel)
}

// setHeaders is used to set canonical response header fields
Expand Down Expand Up @@ -383,13 +390,42 @@ func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOpti

// parseConsistency is used to parse the ?stale and ?consistent query params.
// Returns true on error
func parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
query := req.URL.Query()
defaults := true
if _, ok := query["stale"]; ok {
b.AllowStale = true
defaults = false
}
if _, ok := query["consistent"]; ok {
b.RequireConsistent = true
defaults = false
}
if _, ok := query["leader"]; ok {
defaults = false
}
if maxStale := query.Get("max_stale"); maxStale != "" {
dur, err := time.ParseDuration(maxStale)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale)
return true
}
b.MaxStaleDuration = dur
if dur.Nanoseconds() > 0 {
b.AllowStale = true
defaults = false
}
}
// No specific Consistency has been specified by caller
if defaults {
path := req.URL.Path
if strings.HasPrefix(path, "/v1/catalog") || strings.HasPrefix(path, "/v1/health") {
if s.agent.config.DiscoveryMaxStale.Nanoseconds() > 0 {
b.MaxStaleDuration = s.agent.config.DiscoveryMaxStale
b.AllowStale = true
}
}
}
if b.AllowStale && b.RequireConsistent {
resp.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -457,7 +493,7 @@ func (s *HTTPServer) parseMetaFilter(req *http.Request) map[string]string {
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool {
s.parseDC(req, dc)
s.parseToken(req, &b.Token)
if parseConsistency(resp, req, b) {
if s.parseConsistency(resp, req, b) {
return true
}
return parseWait(resp, req, b)
Expand Down
65 changes: 62 additions & 3 deletions agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,9 @@ func TestParseConsistency(t *testing.T) {
var b structs.QueryOptions

req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale", nil)
if d := parseConsistency(resp, req, &b); d {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
if d := a.srv.parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}

Expand All @@ -620,7 +622,7 @@ func TestParseConsistency(t *testing.T) {

b = structs.QueryOptions{}
req, _ = http.NewRequest("GET", "/v1/catalog/nodes?consistent", nil)
if d := parseConsistency(resp, req, &b); d {
if d := a.srv.parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}

Expand All @@ -632,13 +634,70 @@ func TestParseConsistency(t *testing.T) {
}
}

// ensureConsistency check if consistency modes are correctly applied
// if maxStale < 0 => stale, without MaxStaleDuration
// if maxStale == 0 => no stale
// if maxStale > 0 => stale + check duration
func ensureConsistency(t *testing.T, a *TestAgent, path string, maxStale time.Duration, requireConsistent bool) {
t.Helper()
req, _ := http.NewRequest("GET", path, nil)
var b structs.QueryOptions
resp := httptest.NewRecorder()
if d := a.srv.parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}
allowStale := maxStale.Nanoseconds() != 0
if b.AllowStale != allowStale {
t.Fatalf("Bad Allow Stale")
}
if maxStale > 0 && b.MaxStaleDuration != maxStale {
t.Fatalf("Bad MaxStaleDuration: %d VS expected %d", b.MaxStaleDuration, maxStale)
}
if b.RequireConsistent != requireConsistent {
t.Fatal("Bad Consistent")
}
}

func TestParseConsistencyAndMaxStale(t *testing.T) {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()

// Default => Consistent
a.config.DiscoveryMaxStale = time.Duration(0)
ensureConsistency(t, a, "/v1/catalog/nodes", 0, false)
// Stale, without MaxStale
ensureConsistency(t, a, "/v1/catalog/nodes?stale", -1, false)
// Override explicitly
ensureConsistency(t, a, "/v1/catalog/nodes?max_stale=3s", 3*time.Second, false)
ensureConsistency(t, a, "/v1/catalog/nodes?stale&max_stale=3s", 3*time.Second, false)

// stale by defaul on discovery
a.config.DiscoveryMaxStale = time.Duration(7 * time.Second)
ensureConsistency(t, a, "/v1/catalog/nodes", a.config.DiscoveryMaxStale, false)
// Not in KV
ensureConsistency(t, a, "/v1/kv/my/path", 0, false)

// DiscoveryConsistencyLevel should apply
ensureConsistency(t, a, "/v1/health/service/one", a.config.DiscoveryMaxStale, false)
ensureConsistency(t, a, "/v1/catalog/service/one", a.config.DiscoveryMaxStale, false)
ensureConsistency(t, a, "/v1/catalog/services", a.config.DiscoveryMaxStale, false)

// Query path should be taken into account
ensureConsistency(t, a, "/v1/catalog/services?consistent", 0, true)
// Since stale is added, no MaxStale should be applied
ensureConsistency(t, a, "/v1/catalog/services?stale", -1, false)
ensureConsistency(t, a, "/v1/catalog/services?leader", 0, false)
}

func TestParseConsistency_Invalid(t *testing.T) {
t.Parallel()
resp := httptest.NewRecorder()
var b structs.QueryOptions

req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale&consistent", nil)
if d := parseConsistency(resp, req, &b); !d {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
if d := a.srv.parseConsistency(resp, req, &b); !d {
t.Fatalf("expected done")
}

Expand Down
Loading