Skip to content

Commit

Permalink
nomad: ensure a unique ClusterID exists when leader (gh-6702)
Browse files Browse the repository at this point in the history
As leader, check to see if a ClusterID has already been generated
and replicated through the raft log. If one has not been generated,
generate a UUID and push it through. If a ClusterID has been generated,
its value is applied into the state store. The value is not actually
used anywhere in this changeset, but is a prerequisite for gh-6701.
  • Loading branch information
shoenig committed Nov 14, 2019
1 parent 455f9dd commit f5477c9
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 9 deletions.
51 changes: 51 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
ACLPolicySnapshot
ACLTokenSnapshot
SchedulerConfigSnapshot
ClusterMetadataSnapshot
)

// LogApplier is the definition of a function that can apply a Raft log
Expand Down Expand Up @@ -251,6 +252,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
case structs.NodeBatchDeregisterRequestType:
return n.applyDeregisterNodeBatch(buf[1:], log.Index)
case structs.ClusterMetadataRequestType:
return n.applyClusterMetadata(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand All @@ -267,6 +270,23 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
panic(fmt.Errorf("failed to apply request: %#v", buf))
}

func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "cluster_meta"}, time.Now())

var req structs.ClusterMetadata
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.ClusterSetMetadata(index, &req); err != nil {
n.logger.Error("ClusterSetMetadata failed", "error", err)
}

n.logger.Trace("ClusterSetMetadata", "cluster_id", req.ClusterID)

return nil
}

func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now())
var req structs.NodeRegisterRequest
Expand Down Expand Up @@ -1255,6 +1275,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}

case ClusterMetadataSnapshot:
meta := new(structs.ClusterMetadata)
if err := dec.Decode(meta); err != nil {
return err
}
if err := restore.ClusterMetadataRestore(meta); err != nil {
return err
}

default:
// Check if this is an enterprise only object being restored
restorer, ok := n.enterpriseRestorers[snapType]
Expand Down Expand Up @@ -1527,6 +1556,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistClusterMetadata(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}

Expand Down Expand Up @@ -1874,6 +1907,24 @@ func (s *nomadSnapshot) persistSchedulerConfig(sink raft.SnapshotSink,
return nil
}

func (s *nomadSnapshot) persistClusterMetadata(sink raft.SnapshotSink,
encoder *codec.Encoder) error {

// Get the cluster metadata
clusterMetadata, err := s.snap.ClusterMetadata()
if err != nil {
return err
}

// Write out the cluster metadata
sink.Write([]byte{byte(ClusterMetadataSnapshot)})
if err := encoder.Encode(clusterMetadata); err != nil {
return err
}

return nil
}

// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
Expand Down
56 changes: 56 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2698,7 +2698,24 @@ func TestFSM_SnapshotRestore_SchedulerConfiguration(t *testing.T) {
require.Nil(err)
require.EqualValues(1000, index)
require.Equal(schedConfig, out)
}

func TestFSM_SnapshotRestore_ClusterMetadata(t *testing.T) {
t.Parallel()

fsm := testFSM(t)
state := fsm.State()
clusterID := "12345678-1234-1234-1234-1234567890"
meta := &structs.ClusterMetadata{ClusterID: clusterID}
state.ClusterSetMetadata(1000, meta)

// Verify the contents
require := require.New(t)
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out, err := state2.ClusterMetadata()
require.NoError(err)
require.Equal(clusterID, out.ClusterID)
}

func TestFSM_ReconcileSummaries(t *testing.T) {
Expand Down Expand Up @@ -2972,3 +2989,42 @@ func TestFSM_SchedulerConfig(t *testing.T) {
require.True(config.PreemptionConfig.SystemSchedulerEnabled)
require.True(config.PreemptionConfig.BatchSchedulerEnabled)
}

func TestFSM_ClusterMetadata(t *testing.T) {
t.Parallel()
require := require.New(t)

fsm := testFSM(t)
clusterID := "12345678-1234-1234-1234-1234567890"
meta := structs.ClusterMetadata{
ClusterID: clusterID,
}
buf, err := structs.Encode(structs.ClusterMetadataRequestType, meta)
require.NoError(err)

result := fsm.Apply(makeLog(buf))
if _, ok := result.(error); ok {
t.Fatalf("bad: %v", result)
}

// Verify the clusterID is set directly in the state store
storedMetadata, err := fsm.state.ClusterMetadata()
require.NoError(err)
require.Equal(clusterID, storedMetadata.ClusterID)

// Check that the sanity check prevents accidental UUID regeneration
erroneous := structs.ClusterMetadata{
ClusterID: "99999999-9999-9999-9999-9999999999",
}
buf, err = structs.Encode(structs.ClusterMetadataRequestType, erroneous)
require.NoError(err)

result = fsm.Apply(makeLog(buf))
if _, ok := result.(error); ok {
t.Fatalf("bad: %v", result)
}

storedMetadata, err = fsm.state.ClusterMetadata()
require.NoError(err)
require.Equal(clusterID, storedMetadata.ClusterID)
}
28 changes: 26 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"strings"

metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
Expand Down Expand Up @@ -201,6 +200,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Initialize scheduler configuration
s.getOrCreateSchedulerConfig()

// Initialize the ClusterID
s.getOrCreateClusterID()

// Enable the plan queue, since we are now the leader
s.planQueue.SetEnabled(true)

Expand Down Expand Up @@ -1327,3 +1329,25 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {

return config
}

func (s *Server) getOrCreateClusterID() string {
fsmState := s.fsm.State()
existingMeta, err := fsmState.ClusterMetadata()
if err != nil {
s.logger.Named("core").Error("failed to get cluster ID", "error", err)
return ""
}

if existingMeta == nil || existingMeta.ClusterID == "" {
newMeta := structs.ClusterMetadata{ClusterID: uuid.Generate()}
if _, _, err = s.raftApply(structs.ClusterMetadataRequestType, newMeta); err != nil {
s.logger.Named("core").Error("failed to create cluster ID", "error", err)
return ""
}
s.logger.Info("established cluster id", "cluster_id", newMeta.ClusterID)
return newMeta.ClusterID
}

s.logger.Trace("existing cluster id", "cluster_id", existingMeta.ClusterID)
return existingMeta.ClusterID
}
13 changes: 13 additions & 0 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/hashicorp/consul/testutil/retry"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -629,6 +630,18 @@ func TestLeader_RestoreVaultAccessors(t *testing.T) {
}
}

func TestLeader_ClusterMetadata(t *testing.T) {
s1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

meta, err := s1.fsm.State().ClusterMetadata()
require.NoError(t, err)
require.True(t, helper.IsUUID(meta.ClusterID))
}

func TestLeader_ReplicateACLPolicies(t *testing.T) {
t.Parallel()
s1, root := TestACLServer(t, func(c *Config) {
Expand Down
27 changes: 23 additions & 4 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func init() {
aclTokenTableSchema,
autopilotConfigTableSchema,
schedulerConfigTableSchema,
clusterMetaTableSchema,
}...)
}

Expand Down Expand Up @@ -601,6 +602,12 @@ func aclTokenTableSchema() *memdb.TableSchema {
}
}

// singleRecord can be used to describe tables which should contain only 1 entry.
// Example uses include storing node config or cluster metadata blobs.
var singleRecord = &memdb.ConditionalIndex{
Conditional: func(interface{}) (bool, error) { return true, nil },
}

// schedulerConfigTableSchema returns the MemDB schema for the scheduler config table.
// This table is used to store configuration options for the scheduler
func schedulerConfigTableSchema() *memdb.TableSchema {
Expand All @@ -611,10 +618,22 @@ func schedulerConfigTableSchema() *memdb.TableSchema {
Name: "id",
AllowMissing: true,
Unique: true,
// This indexer ensures that this table is a singleton
Indexer: &memdb.ConditionalIndex{
Conditional: func(obj interface{}) (bool, error) { return true, nil },
},
Indexer: singleRecord, // we store only 1 scheduler config
},
},
}
}

// clusterMetaTableSchema returns the MemDB schema for the scheduler config table.
func clusterMetaTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "cluster_meta",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: singleRecord, // we store only 1 cluster metadata
},
},
}
Expand Down
52 changes: 52 additions & 0 deletions nomad/state/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

memdb "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)

func TestStateStoreSchema(t *testing.T) {
Expand All @@ -13,3 +14,54 @@ func TestStateStoreSchema(t *testing.T) {
t.Fatalf("err: %v", err)
}
}

func TestState_singleRecord(t *testing.T) {
require := require.New(t)

db, err := memdb.NewMemDB(&memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
"example": {
Name: "example",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: singleRecord,
},
},
},
},
})
require.NoError(err)

count := func() int {
txn := db.Txn(false)
defer txn.Abort()

iter, err := txn.Get("example", "id")
require.NoError(err)

num := 0
for item := iter.Next(); item != nil; item = iter.Next() {
num++
}
return num
}

insert := func(s string) {
txn := db.Txn(true)
err := txn.Insert("example", s)
require.NoError(err)
txn.Commit()
}

insert("one")
require.Equal(1, count())

insert("two")
require.Equal(1, count())

insert("three")
require.Equal(1, count())
}
Loading

0 comments on commit f5477c9

Please sign in to comment.