Skip to content

Commit

Permalink
nomad: ensure a unique ClusterID exists when leader (gh-6702)
Browse files Browse the repository at this point in the history
Enable any Server to lookup the unique ClusterID. If one has not been
generated, and this node is the leader, generate a UUID and attempt to
apply it through raft.

The value is not yet used anywhere in this changeset, but is a prerequisite
for gh-6701.
  • Loading branch information
shoenig committed Dec 12, 2019
1 parent e3c81b2 commit 4f1c25c
Show file tree
Hide file tree
Showing 10 changed files with 574 additions and 10 deletions.
52 changes: 52 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
ACLPolicySnapshot
ACLTokenSnapshot
SchedulerConfigSnapshot
ClusterMetadataSnapshot
)

// LogApplier is the definition of a function that can apply a Raft log
Expand Down Expand Up @@ -251,6 +252,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
case structs.NodeBatchDeregisterRequestType:
return n.applyDeregisterNodeBatch(buf[1:], log.Index)
case structs.ClusterMetadataRequestType:
return n.applyClusterMetadata(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand All @@ -267,6 +270,24 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
panic(fmt.Errorf("failed to apply request: %#v", buf))
}

func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "cluster_meta"}, time.Now())

var req structs.ClusterMetadata
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.ClusterSetMetadata(index, &req); err != nil {
n.logger.Error("ClusterSetMetadata failed", "error", err)
return err
}

n.logger.Trace("ClusterSetMetadata", "cluster_id", req.ClusterID, "create_time", req.CreateTime)

return nil
}

func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now())
var req structs.NodeRegisterRequest
Expand Down Expand Up @@ -1255,6 +1276,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}

case ClusterMetadataSnapshot:
meta := new(structs.ClusterMetadata)
if err := dec.Decode(meta); err != nil {
return err
}
if err := restore.ClusterMetadataRestore(meta); err != nil {
return err
}

default:
// Check if this is an enterprise only object being restored
restorer, ok := n.enterpriseRestorers[snapType]
Expand Down Expand Up @@ -1527,6 +1557,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistClusterMetadata(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}

Expand Down Expand Up @@ -1874,6 +1908,24 @@ func (s *nomadSnapshot) persistSchedulerConfig(sink raft.SnapshotSink,
return nil
}

func (s *nomadSnapshot) persistClusterMetadata(sink raft.SnapshotSink,
encoder *codec.Encoder) error {

// Get the cluster metadata
clusterMetadata, err := s.snap.ClusterMetadata()
if err != nil {
return err
}

// Write out the cluster metadata
sink.Write([]byte{byte(ClusterMetadataSnapshot)})
if err := encoder.Encode(clusterMetadata); err != nil {
return err
}

return nil
}

// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
Expand Down
58 changes: 57 additions & 1 deletion nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestFSM_UpsertNode_Canonicalize(t *testing.T) {
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)

// Setup a node without eligiblity
// Setup a node without eligibility
node := mock.Node()
node.SchedulingEligibility = ""

Expand Down Expand Up @@ -2698,7 +2698,25 @@ func TestFSM_SnapshotRestore_SchedulerConfiguration(t *testing.T) {
require.Nil(err)
require.EqualValues(1000, index)
require.Equal(schedConfig, out)
}

func TestFSM_SnapshotRestore_ClusterMetadata(t *testing.T) {
t.Parallel()

fsm := testFSM(t)
state := fsm.State()
clusterID := "12345678-1234-1234-1234-1234567890"
now := time.Now().UnixNano()
meta := &structs.ClusterMetadata{ClusterID: clusterID, CreateTime: now}
state.ClusterSetMetadata(1000, meta)

// Verify the contents
require := require.New(t)
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out, err := state2.ClusterMetadata()
require.NoError(err)
require.Equal(clusterID, out.ClusterID)
}

func TestFSM_ReconcileSummaries(t *testing.T) {
Expand Down Expand Up @@ -2972,3 +2990,41 @@ func TestFSM_SchedulerConfig(t *testing.T) {
require.True(config.PreemptionConfig.SystemSchedulerEnabled)
require.True(config.PreemptionConfig.BatchSchedulerEnabled)
}

func TestFSM_ClusterMetadata(t *testing.T) {
t.Parallel()
r := require.New(t)

fsm := testFSM(t)
clusterID := "12345678-1234-1234-1234-1234567890"
now := time.Now().UnixNano()
meta := structs.ClusterMetadata{
ClusterID: clusterID,
CreateTime: now,
}
buf, err := structs.Encode(structs.ClusterMetadataRequestType, meta)
r.NoError(err)

result := fsm.Apply(makeLog(buf))
r.Nil(result)

// Verify the clusterID is set directly in the state store
storedMetadata, err := fsm.state.ClusterMetadata()
r.NoError(err)
r.Equal(clusterID, storedMetadata.ClusterID)

// Check that the sanity check prevents accidental UUID regeneration
erroneous := structs.ClusterMetadata{
ClusterID: "99999999-9999-9999-9999-9999999999",
}
buf, err = structs.Encode(structs.ClusterMetadataRequestType, erroneous)
r.NoError(err)

result = fsm.Apply(makeLog(buf))
r.Error(result.(error))

storedMetadata, err = fsm.state.ClusterMetadata()
r.NoError(err)
r.Equal(clusterID, storedMetadata.ClusterID)
r.Equal(now, storedMetadata.CreateTime)
}
26 changes: 24 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"strings"

metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"github.com/pkg/errors"
)

const (
Expand All @@ -44,6 +44,8 @@ var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))

var minSchedulerConfigVersion = version.Must(version.NewVersion("0.9.0"))

var minClusterIDVersion = version.Must(version.NewVersion("0.10.3"))

// Default configuration for scheduler with preemption enabled for system jobs
var defaultSchedulerConfig = &structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{
Expand Down Expand Up @@ -201,6 +203,10 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Initialize scheduler configuration
s.getOrCreateSchedulerConfig()

// Initialize the ClusterID
_, _ = s.ClusterID()
// todo: use cluster ID for stuff, later!

// Enable the plan queue, since we are now the leader
s.planQueue.SetEnabled(true)

Expand Down Expand Up @@ -1327,3 +1333,19 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {

return config
}

func (s *Server) generateClusterID() (string, error) {
if !ServersMeetMinimumVersion(s.Members(), minClusterIDVersion, false) {
s.logger.Named("core").Warn("cannot initialize cluster ID until all servers are above minimum version", "min_version", minClusterIDVersion)
return "", errors.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion)
}

newMeta := structs.ClusterMetadata{ClusterID: uuid.Generate(), CreateTime: time.Now().UnixNano()}
if _, _, err := s.raftApply(structs.ClusterMetadataRequestType, newMeta); err != nil {
s.logger.Named("core").Error("failed to create cluster ID", "error", err)
return "", errors.Wrap(err, "failed to create cluster ID")
}

s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime)
return newMeta.ClusterID, nil
}
Loading

0 comments on commit 4f1c25c

Please sign in to comment.