From a7da586215fd653315885aebf8f1e29bd875e5bb Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Thu, 14 Nov 2019 07:18:29 -0600 Subject: [PATCH] nomad: ensure a unique ClusterID exists when leader (gh-6702) Enable any Server to lookup the unique ClusterID. If one has not been generated, and this node is the leader, generate a UUID and attempt to apply it through raft. The value is not yet used anywhere in this changeset, but is a prerequisite for gh-6701. --- nomad/fsm.go | 52 +++++++++ nomad/fsm_test.go | 58 +++++++++- nomad/leader.go | 35 +++++- nomad/leader_test.go | 196 ++++++++++++++++++++++++++++++++ nomad/server.go | 43 +++++++ nomad/state/schema.go | 27 ++++- nomad/state/schema_test.go | 71 ++++++++++++ nomad/state/state_store.go | 63 +++++++++- nomad/state/state_store_test.go | 40 ++++++- nomad/structs/structs.go | 7 ++ 10 files changed, 582 insertions(+), 10 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index fbe9b2342be..b866f1d87ad 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -45,6 +45,7 @@ const ( ACLPolicySnapshot ACLTokenSnapshot SchedulerConfigSnapshot + ClusterMetadataSnapshot ) // LogApplier is the definition of a function that can apply a Raft log @@ -251,6 +252,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applySchedulerConfigUpdate(buf[1:], log.Index) case structs.NodeBatchDeregisterRequestType: return n.applyDeregisterNodeBatch(buf[1:], log.Index) + case structs.ClusterMetadataRequestType: + return n.applyClusterMetadata(buf[1:], log.Index) } // Check enterprise only message types. @@ -267,6 +270,24 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { panic(fmt.Errorf("failed to apply request: %#v", buf)) } +func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "cluster_meta"}, time.Now()) + + var req structs.ClusterMetadata + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.ClusterSetMetadata(index, &req); err != nil { + n.logger.Error("ClusterSetMetadata failed", "error", err) + return err + } + + n.logger.Trace("ClusterSetMetadata", "cluster_id", req.ClusterID, "create_time", req.CreateTime) + + return nil +} + func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now()) var req structs.NodeRegisterRequest @@ -1259,6 +1280,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } + case ClusterMetadataSnapshot: + meta := new(structs.ClusterMetadata) + if err := dec.Decode(meta); err != nil { + return err + } + if err := restore.ClusterMetadataRestore(meta); err != nil { + return err + } + default: // Check if this is an enterprise only object being restored restorer, ok := n.enterpriseRestorers[snapType] @@ -1531,6 +1561,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistClusterMetadata(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -1878,6 +1912,24 @@ func (s *nomadSnapshot) persistSchedulerConfig(sink raft.SnapshotSink, return nil } +func (s *nomadSnapshot) persistClusterMetadata(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + + // Get the cluster metadata + clusterMetadata, err := s.snap.ClusterMetadata() + if err != nil { + return err + } + + // Write out the cluster metadata + sink.Write([]byte{byte(ClusterMetadataSnapshot)}) + if err := encoder.Encode(clusterMetadata); err != nil { + return err + } + + return nil +} + // Release is a no-op, as we just need to GC the pointer // to the state store snapshot. There is nothing to explicitly // cleanup. diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index a9d712c6e38..b10ae138627 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -184,7 +184,7 @@ func TestFSM_UpsertNode_Canonicalize(t *testing.T) { fsm := testFSM(t) fsm.blockedEvals.SetEnabled(true) - // Setup a node without eligiblity + // Setup a node without eligibility node := mock.Node() node.SchedulingEligibility = "" @@ -2764,7 +2764,25 @@ func TestFSM_SnapshotRestore_SchedulerConfiguration(t *testing.T) { require.Nil(err) require.EqualValues(1000, index) require.Equal(schedConfig, out) +} + +func TestFSM_SnapshotRestore_ClusterMetadata(t *testing.T) { + t.Parallel() + + fsm := testFSM(t) + state := fsm.State() + clusterID := "12345678-1234-1234-1234-1234567890" + now := time.Now().UnixNano() + meta := &structs.ClusterMetadata{ClusterID: clusterID, CreateTime: now} + state.ClusterSetMetadata(1000, meta) + // Verify the contents + require := require.New(t) + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + out, err := state2.ClusterMetadata() + require.NoError(err) + require.Equal(clusterID, out.ClusterID) } func TestFSM_ReconcileSummaries(t *testing.T) { @@ -3038,3 +3056,41 @@ func TestFSM_SchedulerConfig(t *testing.T) { require.True(config.PreemptionConfig.SystemSchedulerEnabled) require.True(config.PreemptionConfig.BatchSchedulerEnabled) } + +func TestFSM_ClusterMetadata(t *testing.T) { + t.Parallel() + r := require.New(t) + + fsm := testFSM(t) + clusterID := "12345678-1234-1234-1234-1234567890" + now := time.Now().UnixNano() + meta := structs.ClusterMetadata{ + ClusterID: clusterID, + CreateTime: now, + } + buf, err := structs.Encode(structs.ClusterMetadataRequestType, meta) + r.NoError(err) + + result := fsm.Apply(makeLog(buf)) + r.Nil(result) + + // Verify the clusterID is set directly in the state store + storedMetadata, err := fsm.state.ClusterMetadata() + r.NoError(err) + r.Equal(clusterID, storedMetadata.ClusterID) + + // Check that the sanity check prevents accidental UUID regeneration + erroneous := structs.ClusterMetadata{ + ClusterID: "99999999-9999-9999-9999-9999999999", + } + buf, err = structs.Encode(structs.ClusterMetadataRequestType, erroneous) + r.NoError(err) + + result = fsm.Apply(makeLog(buf)) + r.Error(result.(error)) + + storedMetadata, err = fsm.state.ClusterMetadata() + r.NoError(err) + r.Equal(clusterID, storedMetadata.ClusterID) + r.Equal(now, storedMetadata.CreateTime) +} diff --git a/nomad/leader.go b/nomad/leader.go index 0f2a9f063fd..a8e7d6bfbc5 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -6,13 +6,12 @@ import ( "fmt" "math/rand" "net" + "strings" "sync" "time" "golang.org/x/time/rate" - "strings" - metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" @@ -22,6 +21,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" + "github.com/pkg/errors" ) const ( @@ -44,6 +44,17 @@ var minAutopilotVersion = version.Must(version.NewVersion("0.8.0")) var minSchedulerConfigVersion = version.Must(version.NewVersion("0.9.0")) +var minClusterIDVersion = version.Must(version.NewVersion("0.10.3")) + +// Default configuration for scheduler with preemption enabled for system jobs +var defaultSchedulerConfig = &structs.SchedulerConfiguration{ + PreemptionConfig: structs.PreemptionConfig{ + SystemSchedulerEnabled: true, + BatchSchedulerEnabled: false, + ServiceSchedulerEnabled: false, + }, +} + // monitorLeadership is used to monitor if we acquire or lose our role // as the leader in the Raft cluster. There is some work the leader is // expected to do, so we must react to changes @@ -214,6 +225,10 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Initialize scheduler configuration s.getOrCreateSchedulerConfig() + // Initialize the ClusterID + _, _ = s.ClusterID() + // todo: use cluster ID for stuff, later! + // Enable the plan queue, since we are now the leader s.planQueue.SetEnabled(true) @@ -1340,3 +1355,19 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration { return config } + +func (s *Server) generateClusterID() (string, error) { + if !ServersMeetMinimumVersion(s.Members(), minClusterIDVersion, false) { + s.logger.Named("core").Warn("cannot initialize cluster ID until all servers are above minimum version", "min_version", minClusterIDVersion) + return "", errors.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion) + } + + newMeta := structs.ClusterMetadata{ClusterID: uuid.Generate(), CreateTime: time.Now().UnixNano()} + if _, _, err := s.raftApply(structs.ClusterMetadataRequestType, newMeta); err != nil { + s.logger.Named("core").Error("failed to create cluster ID", "error", err) + return "", errors.Wrap(err, "failed to create cluster ID") + } + + s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime) + return newMeta.ClusterID, nil +} diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 120692e673f..7cbd8278136 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -8,7 +8,10 @@ import ( "time" "github.com/hashicorp/consul/testutil/retry" + "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -629,6 +632,199 @@ func TestLeader_RestoreVaultAccessors(t *testing.T) { } } +func TestLeader_ClusterID(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.Build = minClusterIDVersion.String() + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + clusterID, err := s1.ClusterID() + + require.NoError(t, err) + require.True(t, helper.IsUUID(clusterID)) +} + +func TestLeader_ClusterID_upgradePath(t *testing.T) { + t.Parallel() + + before := version.Must(version.NewVersion("0.10.1")).String() + after := minClusterIDVersion.String() + + type server struct { + s *Server + cleanup func() + } + + outdated := func(bootstrap bool) server { + s, cleanup := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.Build = before + c.DevDisableBootstrap = bootstrap + c.BootstrapExpect = 3 + c.Logger.SetLevel(hclog.Trace) + }) + return server{s: s, cleanup: cleanup} + } + + upgraded := func(bootstrap bool) server { + s, cleanup := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.Build = after + c.DevDisableBootstrap = bootstrap + c.BootstrapExpect = 3 + c.Logger.SetLevel(hclog.Trace) + }) + return server{s: s, cleanup: cleanup} + } + + servers := []server{outdated(false), outdated(true), outdated(true)} + // fallback shutdown attempt in case testing fails + defer servers[0].cleanup() + defer servers[1].cleanup() + defer servers[2].cleanup() + + upgrade := func(i int) { + previous := servers[i] + + servers[i] = upgraded(true) + TestJoin(t, servers[i].s, servers[(i+1)%3].s, servers[(i+2)%3].s) + testutil.WaitForLeader(t, servers[i].s.RPC) + + require.NoError(t, previous.s.Leave()) + require.NoError(t, previous.s.Shutdown()) + } + + // Join the servers before doing anything + TestJoin(t, servers[0].s, servers[1].s, servers[2].s) + + // Wait for servers to settle + for i := 0; i < len(servers); i++ { + testutil.WaitForLeader(t, servers[i].s.RPC) + } + + // A check that ClusterID is not available yet + noIDYet := func() { + for _, s := range servers { + retry.Run(t, func(r *retry.R) { + if _, err := s.s.ClusterID(); err == nil { + r.Error("expected error") + } + }) + } + } + + // Replace first old server with new server + upgrade(0) + defer servers[0].cleanup() + noIDYet() // ClusterID should not work yet, servers: [new, old, old] + + // Replace second old server with new server + upgrade(1) + defer servers[1].cleanup() + noIDYet() // ClusterID should not work yet, servers: [new, new, old] + + // Replace third / final old server with new server + upgrade(2) + defer servers[2].cleanup() + + // Wait for old servers to really be gone + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.s.numPeers() + return peers == 3, nil + }, func(_ error) { + t.Fatalf("should have 3 peers") + }) + } + + // Now we can tickle the leader into making a cluster ID + leaderID := "" + for _, s := range servers { + if s.s.IsLeader() { + id, err := s.s.ClusterID() + require.NoError(t, err) + leaderID = id + break + } + } + require.True(t, helper.IsUUID(leaderID)) + + // Now every participating server has been upgraded, each one should be + // able to get the cluster ID, having been plumbed all the way through. + agreeClusterID(t, []*Server{servers[0].s, servers[1].s, servers[2].s}) +} + +func TestLeader_ClusterID_noUpgrade(t *testing.T) { + t.Parallel() + + type server struct { + s *Server + cleanup func() + } + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.Logger.SetLevel(hclog.Trace) + c.NumSchedulers = 0 + c.Build = minClusterIDVersion.String() + c.BootstrapExpect = 3 + }) + defer cleanupS1() + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.Logger.SetLevel(hclog.Trace) + c.NumSchedulers = 0 + c.Build = minClusterIDVersion.String() + c.DevDisableBootstrap = true + c.BootstrapExpect = 3 + }) + defer cleanupS2() + s3, cleanupS3 := TestServer(t, func(c *Config) { + c.Logger.SetLevel(hclog.Trace) + c.NumSchedulers = 0 + c.Build = minClusterIDVersion.String() + c.DevDisableBootstrap = true + c.BootstrapExpect = 3 + }) + defer cleanupS3() + + servers := []*Server{s1, s2, s3} + + // Join the servers before doing anything + TestJoin(t, servers[0], servers[1], servers[2]) + + // Wait for servers to settle + for i := 0; i < len(servers); i++ { + testutil.WaitForLeader(t, servers[i].RPC) + } + + // Each server started at the minimum version, check there should be only 1 + // cluster ID they all agree on. + agreeClusterID(t, []*Server{servers[0], servers[1], servers[2]}) +} + +func agreeClusterID(t *testing.T, servers []*Server) { + retries := &retry.Timer{Timeout: 60 * time.Second, Wait: 1 * time.Second} + ids := make([]string, 3) + for i, s := range servers { + retry.RunWith(retries, t, func(r *retry.R) { + id, err := s.ClusterID() + if err != nil { + r.Error(err.Error()) + return + } + if !helper.IsUUID(id) { + r.Error("not a UUID") + return + } + ids[i] = id + }) + } + require.True(t, ids[0] == ids[1] && ids[1] == ids[2], "ids[0] %s, ids[1] %s, ids[2] %s", ids[0], ids[1], ids[2]) +} + func TestLeader_ReplicateACLPolicies(t *testing.T) { t.Parallel() diff --git a/nomad/server.go b/nomad/server.go index 7d69ec57b5c..4dd840973c7 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -219,6 +219,10 @@ type Server struct { leaderAcl string leaderAclLock sync.Mutex + // clusterIDLock ensures the server does not try to concurrently establish + // a cluster ID, racing against itself in calls of ClusterID + clusterIDLock sync.Mutex + // statsFetcher is used by autopilot to check the status of the other // Nomad router. statsFetcher *StatsFetcher @@ -1567,6 +1571,45 @@ func (s *Server) ReplicationToken() string { return s.config.ReplicationToken } +// ClusterID returns the unique ID for this cluster. +// +// Any Nomad server agent may call this method to get at the ID. +// If we are the leader and the ID has not yet been created, it will +// be created now. Otherwise an error is returned. +// +// The ID will not be created until all participating servers have reached +// a minimum version (0.10.3). +func (s *Server) ClusterID() (string, error) { + s.clusterIDLock.Lock() + defer s.clusterIDLock.Unlock() + + // try to load the cluster ID from state store + fsmState := s.fsm.State() + existingMeta, err := fsmState.ClusterMetadata() + if err != nil { + s.logger.Named("core").Error("failed to get cluster ID", "error", err) + return "", err + } + + // got the cluster ID from state store, cache that and return it + if existingMeta != nil && existingMeta.ClusterID != "" { + return existingMeta.ClusterID, nil + } + + // if we are not the leader, nothing more we can do + if !s.IsLeader() { + return "", errors.New("cluster ID not ready yet") + } + + // we are the leader, try to generate the ID now + generatedID, err := s.generateClusterID() + if err != nil { + return "", err + } + + return generatedID, nil +} + // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same // location. diff --git a/nomad/state/schema.go b/nomad/state/schema.go index dc262977e8c..c4bdf35478d 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -45,6 +45,7 @@ func init() { aclTokenTableSchema, autopilotConfigTableSchema, schedulerConfigTableSchema, + clusterMetaTableSchema, }...) } @@ -601,6 +602,12 @@ func aclTokenTableSchema() *memdb.TableSchema { } } +// singletonRecord can be used to describe tables which should contain only 1 entry. +// Example uses include storing node config or cluster metadata blobs. +var singletonRecord = &memdb.ConditionalIndex{ + Conditional: func(interface{}) (bool, error) { return true, nil }, +} + // schedulerConfigTableSchema returns the MemDB schema for the scheduler config table. // This table is used to store configuration options for the scheduler func schedulerConfigTableSchema() *memdb.TableSchema { @@ -611,10 +618,22 @@ func schedulerConfigTableSchema() *memdb.TableSchema { Name: "id", AllowMissing: true, Unique: true, - // This indexer ensures that this table is a singleton - Indexer: &memdb.ConditionalIndex{ - Conditional: func(obj interface{}) (bool, error) { return true, nil }, - }, + Indexer: singletonRecord, // we store only 1 scheduler config + }, + }, + } +} + +// clusterMetaTableSchema returns the MemDB schema for the scheduler config table. +func clusterMetaTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "cluster_meta", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: singletonRecord, // we store only 1 cluster metadata }, }, } diff --git a/nomad/state/schema_test.go b/nomad/state/schema_test.go index 1e1a17d8220..fd2b0f79fb9 100644 --- a/nomad/state/schema_test.go +++ b/nomad/state/schema_test.go @@ -4,6 +4,7 @@ import ( "testing" memdb "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" ) func TestStateStoreSchema(t *testing.T) { @@ -13,3 +14,73 @@ func TestStateStoreSchema(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestState_singleRecord(t *testing.T) { + require := require.New(t) + + const ( + singletonTable = "cluster_meta" + singletonIDIdx = "id" + ) + + db, err := memdb.NewMemDB(&memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + singletonTable: clusterMetaTableSchema(), + }, + }) + require.NoError(err) + + // numRecords in table counts all the items in the table, which is expected + // to always be 1 since that's the point of the singletonRecord Indexer. + numRecordsInTable := func() int { + txn := db.Txn(false) + defer txn.Abort() + + iter, err := txn.Get(singletonTable, singletonIDIdx) + require.NoError(err) + + num := 0 + for item := iter.Next(); item != nil; item = iter.Next() { + num++ + } + return num + } + + // setSingleton "updates" the singleton record in the singletonTable, + // which requires that the singletonRecord Indexer is working as + // expected. + setSingleton := func(s string) { + txn := db.Txn(true) + err := txn.Insert(singletonTable, s) + require.NoError(err) + txn.Commit() + } + + // first retrieves the one expected entry in the singletonTable - use the + // numRecordsInTable helper function to make the cardinality assertion, + // this is just for fetching the value. + first := func() string { + txn := db.Txn(false) + defer txn.Abort() + record, err := txn.First(singletonTable, singletonIDIdx) + require.NoError(err) + s, ok := record.(string) + require.True(ok) + return s + } + + // Ensure that multiple Insert & Commit calls result in only + // a single "singleton" record existing in the table. + + setSingleton("one") + require.Equal(1, numRecordsInTable()) + require.Equal("one", first()) + + setSingleton("two") + require.Equal(1, numRecordsInTable()) + require.Equal("two", first()) + + setSingleton("three") + require.Equal(1, numRecordsInTable()) + require.Equal("three", first()) +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1590a3fee3e..f28910f4f78 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3,16 +3,16 @@ package state import ( "context" "fmt" + "reflect" "sort" "time" - "reflect" - log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" + "github.com/pkg/errors" ) // Txn is a transaction against a state store. @@ -3918,6 +3918,35 @@ func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerCon return nil } +func (s *StateStore) ClusterMetadata() (*structs.ClusterMetadata, error) { + txn := s.db.Txn(false) + defer txn.Abort() + + // Get the cluster metadata + m, err := txn.First("cluster_meta", "id") + if err != nil { + return nil, errors.Wrap(err, "failed cluster metadata lookup") + } + + if m != nil { + return m.(*structs.ClusterMetadata), nil + } + + return nil, nil +} + +func (s *StateStore) ClusterSetMetadata(index uint64, meta *structs.ClusterMetadata) error { + txn := s.db.Txn(true) + defer txn.Abort() + + if err := s.setClusterMetadata(txn, meta); err != nil { + return errors.Wrap(err, "set cluster metadata failed") + } + + txn.Commit() + return nil +} + // WithWriteTransaction executes the passed function within a write transaction, // and returns its result. If the invocation returns no error, the transaction // is committed; otherwise, it's aborted. @@ -3980,6 +4009,29 @@ func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *st return nil } +func (s *StateStore) setClusterMetadata(txn *memdb.Txn, meta *structs.ClusterMetadata) error { + // Check for an existing config, if it exists, sanity check the cluster ID matches + existing, err := txn.First("cluster_meta", "id") + if err != nil { + return fmt.Errorf("failed cluster meta lookup: %v", err) + } + + if existing != nil { + existingClusterID := existing.(*structs.ClusterMetadata).ClusterID + if meta.ClusterID != existingClusterID { + // there is a bug in cluster ID detection + return fmt.Errorf("refusing to set new cluster id, previous: %s, new: %s", existingClusterID, meta.ClusterID) + } + } + + // update is technically a noop, unless someday we add more / mutable fields + if err := txn.Insert("cluster_meta", meta); err != nil { + return fmt.Errorf("set cluster metadata failed: %v", err) + } + + return nil +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore @@ -4182,3 +4234,10 @@ func (r *StateRestore) SchedulerConfigRestore(schedConfig *structs.SchedulerConf } return nil } + +func (r *StateRestore) ClusterMetadataRestore(meta *structs.ClusterMetadata) error { + if err := r.txn.Insert("cluster_meta", meta); err != nil { + return fmt.Errorf("inserting cluster meta failed: %v", err) + } + return nil +} diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index edfa5cd3de3..b01018cc0c8 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7345,7 +7345,6 @@ func TestStateStore_SchedulerConfig(t *testing.T) { require := require.New(t) restore, err := state.Restore() - require.Nil(err) err = restore.SchedulerConfigRestore(schedConfig) @@ -7360,6 +7359,45 @@ func TestStateStore_SchedulerConfig(t *testing.T) { require.Equal(schedConfig, out) } +func TestStateStore_ClusterMetadata(t *testing.T) { + require := require.New(t) + + state := testStateStore(t) + clusterID := "12345678-1234-1234-1234-1234567890" + now := time.Now().UnixNano() + meta := &structs.ClusterMetadata{ClusterID: clusterID, CreateTime: now} + + err := state.ClusterSetMetadata(100, meta) + require.NoError(err) + + result, err := state.ClusterMetadata() + require.NoError(err) + require.Equal(clusterID, result.ClusterID) + require.Equal(now, result.CreateTime) +} + +func TestStateStore_ClusterMetadataRestore(t *testing.T) { + require := require.New(t) + + state := testStateStore(t) + clusterID := "12345678-1234-1234-1234-1234567890" + now := time.Now().UnixNano() + meta := &structs.ClusterMetadata{ClusterID: clusterID, CreateTime: now} + + restore, err := state.Restore() + require.NoError(err) + + err = restore.ClusterMetadataRestore(meta) + require.NoError(err) + + restore.Commit() + + out, err := state.ClusterMetadata() + require.NoError(err) + require.Equal(clusterID, out.ClusterID) + require.Equal(now, out.CreateTime) +} + func TestStateStore_Abandon(t *testing.T) { t.Parallel() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e41af3e99b0..8d6333c2142 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -83,6 +83,7 @@ const ( BatchNodeUpdateDrainRequestType SchedulerConfigRequestType NodeBatchDeregisterRequestType + ClusterMetadataRequestType ) const ( @@ -879,6 +880,12 @@ type ServerMember struct { DelegateCur uint8 } +// ClusterMetadata is used to store per-cluster metadata. +type ClusterMetadata struct { + ClusterID string + CreateTime int64 +} + // DeriveVaultTokenRequest is used to request wrapped Vault tokens for the // following tasks in the given allocation type DeriveVaultTokenRequest struct {