Skip to content

Commit

Permalink
server: don't activate federation state replication or anti-entropy u…
Browse files Browse the repository at this point in the history
…ntil all servers are running 1.8.0+ (#8014)
  • Loading branch information
rboyer authored Jun 4, 2020
1 parent dfcf45c commit b88bd66
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 4 deletions.
25 changes: 25 additions & 0 deletions agent/consul/federation_state_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consul

import (
"errors"
"fmt"
"time"

Expand All @@ -11,6 +12,10 @@ import (
memdb "github.com/hashicorp/go-memdb"
)

var (
errFederationStatesNotEnabled = errors.New("Federation states are currently disabled until all servers in the datacenter support the feature")
)

// FederationState endpoint is used to manipulate federation states from all
// datacenters.
type FederationState struct {
Expand All @@ -25,6 +30,11 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo
if done, err := c.srv.forward("FederationState.Apply", args, args, reply); done {
return err
}

if !c.srv.DatacenterSupportsFederationStates() {
return errFederationStatesNotEnabled
}

defer metrics.MeasureSince([]string{"federation_state", "apply"}, time.Now())

// Fetch the ACL token, if any.
Expand Down Expand Up @@ -69,6 +79,11 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs
if done, err := c.srv.forward("FederationState.Get", args, args, reply); done {
return err
}

if !c.srv.DatacenterSupportsFederationStates() {
return errFederationStatesNotEnabled
}

defer metrics.MeasureSince([]string{"federation_state", "get"}, time.Now())

// Fetch the ACL token, if any.
Expand Down Expand Up @@ -105,6 +120,11 @@ func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.I
if done, err := c.srv.forward("FederationState.List", args, args, reply); done {
return err
}

if !c.srv.DatacenterSupportsFederationStates() {
return errFederationStatesNotEnabled
}

defer metrics.MeasureSince([]string{"federation_state", "list"}, time.Now())

// Fetch the ACL token, if any.
Expand Down Expand Up @@ -143,6 +163,11 @@ func (c *FederationState) ListMeshGateways(args *structs.DCSpecificRequest, repl
if done, err := c.srv.forward("FederationState.ListMeshGateways", args, args, reply); done {
return err
}

if !c.srv.DatacenterSupportsFederationStates() {
return errFederationStatesNotEnabled
}

defer metrics.MeasureSince([]string{"federation_state", "list_mesh_gateways"}, time.Now())

return c.srv.blockingQuery(
Expand Down
10 changes: 10 additions & 0 deletions agent/consul/federation_state_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ package consul

import (
"context"
"errors"
"fmt"
"sort"
"time"

"github.com/hashicorp/consul/agent/structs"
)

var errFederationStatesNotSupported = errors.New("Not all servers in the datacenter support federation states - preventing replication")

func isErrFederationStatesNotSupported(err error) bool {
return errors.Is(err, errFederationStatesNotSupported)
}

type FederationStateReplicator struct {
srv *Server
gatewayLocator *GatewayLocator
Expand All @@ -27,6 +34,9 @@ func (r *FederationStateReplicator) MetricName() string { return "federation-sta

// FetchRemote implements IndexReplicatorDelegate.
func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error) {
if !r.srv.DatacenterSupportsFederationStates() {
return 0, nil, 0, errFederationStatesNotSupported
}
lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex)
if r.gatewayLocator != nil {
r.gatewayLocator.SetLastFederationStateReplicationError(err)
Expand Down
61 changes: 61 additions & 0 deletions agent/consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1529,3 +1529,64 @@ func (s *Server) reapTombstones(index uint64) {
)
}
}

func (s *Server) DatacenterSupportsFederationStates() bool {
if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 {
return true
}

state := serversFederationStatesInfo{
supported: true,
found: false,
}

// check if they are supported in the primary dc
if s.config.PrimaryDatacenter != s.config.Datacenter {
s.router.CheckServers(s.config.PrimaryDatacenter, state.update)

if !state.supported || !state.found {
s.logger.Debug("federation states are not enabled in the primary dc")
return false
}
}

// check the servers in the local DC
s.router.CheckServers(s.config.Datacenter, state.update)

if state.supported && state.found {
atomic.StoreInt32(&s.dcSupportsFederationStates, 1)
return true
}

s.logger.Debug("federation states are not enabled in this datacenter", "datacenter", s.config.Datacenter)
return false
}

type serversFederationStatesInfo struct {
// supported indicates whether every processed server supports federation states
supported bool

// found indicates that at least one server was processed
found bool
}

func (s *serversFederationStatesInfo) update(srv *metadata.Server) bool {
if srv.Status != serf.StatusAlive && srv.Status != serf.StatusFailed {
// they are left or something so regardless we treat these servers as meeting
// the version requirement
return true
}

// mark that we processed at least one server
s.found = true

if supported, ok := srv.FeatureFlags["fs"]; ok && supported == 1 {
return true
}

// mark that at least one server does not support federation states
s.supported = false

// prevent continuing server evaluation
return false
}
4 changes: 4 additions & 0 deletions agent/consul/leader_federation_state_ae.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (s *Server) federationStateAntiEntropySync(ctx context.Context) error {
var lastFetchIndex uint64

retryLoopBackoff(ctx.Done(), func() error {
if !s.DatacenterSupportsFederationStates() {
return nil
}

idx, err := s.federationStateAntiEntropyMaybeSync(ctx, lastFetchIndex)
if err != nil {
return err
Expand Down
Loading

0 comments on commit b88bd66

Please sign in to comment.