diff --git a/nomad/fsm.go b/nomad/fsm.go index c41e5b9d1b4..4adcea57d01 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -45,6 +45,7 @@ const ( ACLPolicySnapshot ACLTokenSnapshot SchedulerConfigSnapshot + ClusterMetadataSnapshot ) // LogApplier is the definition of a function that can apply a Raft log @@ -251,6 +252,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applySchedulerConfigUpdate(buf[1:], log.Index) case structs.NodeBatchDeregisterRequestType: return n.applyDeregisterNodeBatch(buf[1:], log.Index) + case structs.ClusterMetadataRequestType: + return n.applyClusterMetadata(buf[1:], log.Index) } // Check enterprise only message types. @@ -267,6 +270,23 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { panic(fmt.Errorf("failed to apply request: %#v", buf)) } +func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "cluster_meta"}, time.Now()) + + var req structs.ClusterMetadata + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.ClusterSetMetadata(index, &req); err != nil { + n.logger.Error("ClusterSetMetadata failed", "error", err) + } + + n.logger.Trace("ClusterSetMetadata", "cluster_id", req.ClusterID) + + return nil +} + func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now()) var req structs.NodeRegisterRequest @@ -1255,6 +1275,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } + case ClusterMetadataSnapshot: + meta := new(structs.ClusterMetadata) + if err := dec.Decode(meta); err != nil { + return err + } + if err := restore.ClusterMetadataRestore(meta); err != nil { + return err + } + default: // Check if this is an enterprise only object being restored restorer, ok := n.enterpriseRestorers[snapType] @@ -1527,6 +1556,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistClusterMetadata(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -1874,6 +1907,24 @@ func (s *nomadSnapshot) persistSchedulerConfig(sink raft.SnapshotSink, return nil } +func (s *nomadSnapshot) persistClusterMetadata(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + + // Get the cluster metadata + clusterMetadata, err := s.snap.ClusterMetadata() + if err != nil { + return err + } + + // Write out the cluster metadata + sink.Write([]byte{byte(ClusterMetadataSnapshot)}) + if err := encoder.Encode(clusterMetadata); err != nil { + return err + } + + return nil +} + // Release is a no-op, as we just need to GC the pointer // to the state store snapshot. There is nothing to explicitly // cleanup. diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index d75ba1ee396..7b69315a766 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2698,7 +2698,24 @@ func TestFSM_SnapshotRestore_SchedulerConfiguration(t *testing.T) { require.Nil(err) require.EqualValues(1000, index) require.Equal(schedConfig, out) +} + +func TestFSM_SnapshotRestore_ClusterMetadata(t *testing.T) { + t.Parallel() + + fsm := testFSM(t) + state := fsm.State() + clusterID := "12345678-1234-1234-1234-1234567890" + meta := &structs.ClusterMetadata{ClusterID: clusterID} + state.ClusterSetMetadata(1000, meta) + // Verify the contents + require := require.New(t) + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + out, err := state2.ClusterMetadata() + require.NoError(err) + require.Equal(clusterID, out.ClusterID) } func TestFSM_ReconcileSummaries(t *testing.T) { @@ -2972,3 +2989,42 @@ func TestFSM_SchedulerConfig(t *testing.T) { require.True(config.PreemptionConfig.SystemSchedulerEnabled) require.True(config.PreemptionConfig.BatchSchedulerEnabled) } + +func TestFSM_ClusterMetadata(t *testing.T) { + t.Parallel() + require := require.New(t) + + fsm := testFSM(t) + clusterID := "12345678-1234-1234-1234-1234567890" + meta := structs.ClusterMetadata{ + ClusterID: clusterID, + } + buf, err := structs.Encode(structs.ClusterMetadataRequestType, meta) + require.NoError(err) + + result := fsm.Apply(makeLog(buf)) + if _, ok := result.(error); ok { + t.Fatalf("bad: %v", result) + } + + // Verify the clusterID is set directly in the state store + storedMetadata, err := fsm.state.ClusterMetadata() + require.NoError(err) + require.Equal(clusterID, storedMetadata.ClusterID) + + // Check that the sanity check prevents accidental UUID regeneration + erroneous := structs.ClusterMetadata{ + ClusterID: "99999999-9999-9999-9999-9999999999", + } + buf, err = structs.Encode(structs.ClusterMetadataRequestType, erroneous) + require.NoError(err) + + result = fsm.Apply(makeLog(buf)) + if _, ok := result.(error); ok { + t.Fatalf("bad: %v", result) + } + + storedMetadata, err = fsm.state.ClusterMetadata() + require.NoError(err) + require.Equal(clusterID, storedMetadata.ClusterID) +} diff --git a/nomad/leader.go b/nomad/leader.go index ec406a4b4b1..da12832afec 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -6,13 +6,12 @@ import ( "fmt" "math/rand" "net" + "strings" "sync" "time" "golang.org/x/time/rate" - "strings" - metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" @@ -201,6 +200,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Initialize scheduler configuration s.getOrCreateSchedulerConfig() + // Initialize the ClusterID + s.getOrCreateClusterID() + // Enable the plan queue, since we are now the leader s.planQueue.SetEnabled(true) @@ -1327,3 +1329,25 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration { return config } + +func (s *Server) getOrCreateClusterID() string { + fsmState := s.fsm.State() + existingMeta, err := fsmState.ClusterMetadata() + if err != nil { + s.logger.Named("core").Error("failed to get cluster ID", "error", err) + return "" + } + + if existingMeta == nil || existingMeta.ClusterID == "" { + newMeta := structs.ClusterMetadata{ClusterID: uuid.Generate()} + if _, _, err = s.raftApply(structs.ClusterMetadataRequestType, newMeta); err != nil { + s.logger.Named("core").Error("failed to create cluster ID", "error", err) + return "" + } + s.logger.Info("established cluster id", "cluster_id", newMeta.ClusterID) + return newMeta.ClusterID + } + + s.logger.Trace("existing cluster id", "cluster_id", existingMeta.ClusterID) + return existingMeta.ClusterID +} diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 2ffceb05390..1274eb78a91 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/testutil/retry" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -629,6 +630,18 @@ func TestLeader_RestoreVaultAccessors(t *testing.T) { } } +func TestLeader_ClusterMetadata(t *testing.T) { + s1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + meta, err := s1.fsm.State().ClusterMetadata() + require.NoError(t, err) + require.True(t, helper.IsUUID(meta.ClusterID)) +} + func TestLeader_ReplicateACLPolicies(t *testing.T) { t.Parallel() s1, root := TestACLServer(t, func(c *Config) { diff --git a/nomad/state/schema.go b/nomad/state/schema.go index dc262977e8c..0bed41eab28 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -45,6 +45,7 @@ func init() { aclTokenTableSchema, autopilotConfigTableSchema, schedulerConfigTableSchema, + clusterMetaTableSchema, }...) } @@ -601,6 +602,12 @@ func aclTokenTableSchema() *memdb.TableSchema { } } +// singleRecord can be used to describe tables which should contain only 1 entry. +// Example uses include storing node config or cluster metadata blobs. +var singleRecord = &memdb.ConditionalIndex{ + Conditional: func(interface{}) (bool, error) { return true, nil }, +} + // schedulerConfigTableSchema returns the MemDB schema for the scheduler config table. // This table is used to store configuration options for the scheduler func schedulerConfigTableSchema() *memdb.TableSchema { @@ -611,10 +618,22 @@ func schedulerConfigTableSchema() *memdb.TableSchema { Name: "id", AllowMissing: true, Unique: true, - // This indexer ensures that this table is a singleton - Indexer: &memdb.ConditionalIndex{ - Conditional: func(obj interface{}) (bool, error) { return true, nil }, - }, + Indexer: singleRecord, // we store only 1 scheduler config + }, + }, + } +} + +// clusterMetaTableSchema returns the MemDB schema for the scheduler config table. +func clusterMetaTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "cluster_meta", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: singleRecord, // we store only 1 cluster metadata }, }, } diff --git a/nomad/state/schema_test.go b/nomad/state/schema_test.go index 1e1a17d8220..6bcab0e0999 100644 --- a/nomad/state/schema_test.go +++ b/nomad/state/schema_test.go @@ -4,6 +4,7 @@ import ( "testing" memdb "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" ) func TestStateStoreSchema(t *testing.T) { @@ -13,3 +14,54 @@ func TestStateStoreSchema(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestState_singleRecord(t *testing.T) { + require := require.New(t) + + db, err := memdb.NewMemDB(&memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + "example": { + Name: "example", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: singleRecord, + }, + }, + }, + }, + }) + require.NoError(err) + + count := func() int { + txn := db.Txn(false) + defer txn.Abort() + + iter, err := txn.Get("example", "id") + require.NoError(err) + + num := 0 + for item := iter.Next(); item != nil; item = iter.Next() { + num++ + } + return num + } + + insert := func(s string) { + txn := db.Txn(true) + err := txn.Insert("example", s) + require.NoError(err) + txn.Commit() + } + + insert("one") + require.Equal(1, count()) + + insert("two") + require.Equal(1, count()) + + insert("three") + require.Equal(1, count()) +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index ab61390b5c3..be998777a7b 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3,11 +3,10 @@ package state import ( "context" "fmt" + "reflect" "sort" "time" - "reflect" - log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" multierror "github.com/hashicorp/go-multierror" @@ -3924,6 +3923,36 @@ func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerCon return nil } +func (s *StateStore) ClusterMetadata() (*structs.ClusterMetadata, error) { + txn := s.db.Txn(false) + defer txn.Abort() + + // Get the cluster metadata + m, err := txn.First("cluster_meta", "id") + if err != nil { + return nil, fmt.Errorf("cluster metadata lookup failed: %v", err) + } + + metadata, ok := m.(*structs.ClusterMetadata) + if !ok { + return nil, nil + } + + return metadata, nil +} + +func (s *StateStore) ClusterSetMetadata(index uint64, meta *structs.ClusterMetadata) error { + txn := s.db.Txn(true) + defer txn.Abort() + + if err := s.setClusterMetadata(txn, meta); err != nil { + return fmt.Errorf("set cluster metadata failed: %v", err) + } + + txn.Commit() + return nil +} + // WithWriteTransaction executes the passed function within a write transaction, // and returns its result. If the invocation returns no error, the transaction // is committed; otherwise, it's aborted. @@ -3986,6 +4015,29 @@ func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *st return nil } +func (s *StateStore) setClusterMetadata(txn *memdb.Txn, meta *structs.ClusterMetadata) error { + // Check for an existing config, if it exists, sanity check the ClusterID matches + existing, err := txn.First("cluster_meta", "id") + if err != nil { + return fmt.Errorf("failed cluster meta lookup: %v", err) + } + + if existing != nil { + existingClusterID := existing.(*structs.ClusterMetadata).ClusterID + if meta.ClusterID != existingClusterID { + // there is a bug in clusterID detection + return fmt.Errorf("refusing to set new cluster id, previous: %s, new: %s", existingClusterID, meta.ClusterID) + } + } + + // update is technically a noop, unless someday we add more / mutable fields + if err := txn.Insert("cluster_meta", meta); err != nil { + return fmt.Errorf("set cluster metadata failed: %v", err) + } + + return nil +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore @@ -4189,6 +4241,13 @@ func (r *StateRestore) SchedulerConfigRestore(schedConfig *structs.SchedulerConf return nil } +func (r *StateRestore) ClusterMetadataRestore(meta *structs.ClusterMetadata) error { + if err := r.txn.Insert("cluster_meta", meta); err != nil { + return fmt.Errorf("inserting cluster meta failed: %v", err) + } + return nil +} + // addEphemeralDiskToTaskGroups adds missing EphemeralDisk objects to TaskGroups func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { for _, tg := range job.TaskGroups { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1fc6dcf77a2..e178c485fb9 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7083,7 +7083,6 @@ func TestStateStore_SchedulerConfig(t *testing.T) { require := require.New(t) restore, err := state.Restore() - require.Nil(err) err = restore.SchedulerConfigRestore(schedConfig) @@ -7098,6 +7097,41 @@ func TestStateStore_SchedulerConfig(t *testing.T) { require.Equal(schedConfig, out) } +func TestStateStore_ClusterMetadata(t *testing.T) { + require := require.New(t) + + state := testStateStore(t) + clusterID := "12345678-1234-1234-1234-1234567890" + meta := &structs.ClusterMetadata{ClusterID: clusterID} + + err := state.ClusterSetMetadata(100, meta) + require.NoError(err) + + result, err := state.ClusterMetadata() + require.NoError(err) + require.Equal(clusterID, result.ClusterID) +} + +func TestStateStore_ClusterMetadataRestore(t *testing.T) { + require := require.New(t) + + state := testStateStore(t) + clusterID := "12345678-1234-1234-1234-1234567890" + meta := &structs.ClusterMetadata{ClusterID: clusterID} + + restore, err := state.Restore() + require.NoError(err) + + err = restore.ClusterMetadataRestore(meta) + require.NoError(err) + + restore.Commit() + + out, err := state.ClusterMetadata() + require.NoError(err) + require.Equal(clusterID, out.ClusterID) +} + func TestStateStore_Abandon(t *testing.T) { s := testStateStore(t) abandonCh := s.AbandonCh() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d96e4cf6e7e..0c9603f38bb 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -81,6 +81,7 @@ const ( BatchNodeUpdateDrainRequestType SchedulerConfigRequestType NodeBatchDeregisterRequestType + ClusterMetadataRequestType ) const ( @@ -831,6 +832,11 @@ type ServerMember struct { DelegateCur uint8 } +// ClusterMetadata is used to store per-cluster metadata. +type ClusterMetadata struct { + ClusterID string +} + // DeriveVaultTokenRequest is used to request wrapped Vault tokens for the // following tasks in the given allocation type DeriveVaultTokenRequest struct {