From 0ed92a665107e3fb0b3b445a4e16f3095de01837 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Mon, 5 Oct 2020 19:40:06 -0400 Subject: [PATCH] event durability count and cfg --- command/agent/config.go | 9 +++++++++ command/agent/config_parse_test.go | 1 + command/agent/testdata/basic.hcl | 1 + command/agent/testdata/basic.json | 1 + nomad/fsm.go | 27 +++++++++++++++------------ nomad/fsm_test.go | 7 +++---- nomad/state/state_changes.go | 6 ++---- nomad/state/state_store.go | 9 +++++---- nomad/state/testing.go | 8 ++++---- 9 files changed, 41 insertions(+), 28 deletions(-) diff --git a/command/agent/config.go b/command/agent/config.go index 703dda49882..82c3b7089db 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -488,6 +488,10 @@ type ServerConfig struct { // will generate events for its event stream. EnableEventPublisher bool `hcl:"enable_event_publisher"` + // DurableEventCount specifies the amount of events to persist during snapshot generation. + // A count of 0 signals that no events should be persisted. + DurableEventCount int `hcl:"durable_event_count"` + // ExtraKeysHCL is used by hcl to surface unexpected keys ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` } @@ -888,6 +892,7 @@ func DefaultConfig() *Config { Server: &ServerConfig{ Enabled: false, EnableEventPublisher: true, + DurableEventCount: 100, StartJoin: []string{}, ServerJoin: &ServerJoin{ RetryJoin: []string{}, @@ -1416,6 +1421,10 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig { result.EnableEventPublisher = true } + if b.DurableEventCount != 0 { + result.DurableEventCount = b.DurableEventCount + } + if b.DefaultSchedulerConfig != nil { c := *b.DefaultSchedulerConfig result.DefaultSchedulerConfig = &c diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 4761c8a1c01..6db5cd92067 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -123,6 +123,7 @@ var basicConfig = &Config{ UpgradeVersion: "0.8.0", EncryptKey: "abc", EnableEventPublisher: true, + DurableEventCount: 100, ServerJoin: &ServerJoin{ RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, RetryInterval: time.Duration(15) * time.Second, diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index 7e2edcb3ebd..e5f6798cfb8 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -131,6 +131,7 @@ server { encrypt = "abc" raft_multiplier = 4 enable_event_publisher = true + durable_event_count = 100 server_join { retry_join = ["1.1.1.1", "2.2.2.2"] diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index 3d50f3511b2..d474a79f913 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -262,6 +262,7 @@ "deployment_gc_threshold": "12h", "enabled": true, "enable_event_publisher": true, + "durable_event_count": 100, "enabled_schedulers": [ "test" ], diff --git a/nomad/fsm.go b/nomad/fsm.go index ce5caaafd5a..db1aee98cf7 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -128,17 +128,21 @@ type FSMConfig struct { Region string EnableEventPublisher bool + + // Durable count specifies the amount of events generated by the state store + // to save to disk during snapshot generation. The most recent events + // limited to count will be saved. + DurableEventCount int } // NewFSMPath is used to construct a new FSM with a blank state func NewFSM(config *FSMConfig) (*nomadFSM, error) { // Create a state store sconfig := &state.StateStoreConfig{ - Logger: config.Logger, - Region: config.Region, - EnablePublisher: config.EnableEventPublisher, - // TODO(drew) plumb cfg - EnableDurability: true, + Logger: config.Logger, + Region: config.Region, + EnablePublisher: config.EnableEventPublisher, + DurableEventCount: config.DurableEventCount, } state, err := state.NewStateStore(sconfig) if err != nil { @@ -1269,11 +1273,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // Create a new state store config := &state.StateStoreConfig{ - Logger: n.config.Logger, - Region: n.config.Region, - EnablePublisher: n.config.EnableEventPublisher, - // TODO(drew) plumb cfg - EnableDurability: true, + Logger: n.config.Logger, + Region: n.config.Region, + EnablePublisher: n.config.EnableEventPublisher, + DurableEventCount: n.config.DurableEventCount, } newState, err := state.NewStateStore(config) if err != nil { @@ -2363,10 +2366,10 @@ func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink, func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error { var durableCount int - if s.snap.Config() != nil && !s.snap.Config().EnableDurability { + if s.snap.Config() != nil && s.snap.Config().DurableEventCount == 0 { return nil } else { - durableCount = s.snap.Config().DurableCount + durableCount = s.snap.Config().DurableEventCount } events, err := s.snap.LatestEventsReverse(nil) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 81e583568aa..98b82feac5b 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -3206,9 +3206,8 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) { fsm := testFSM(t) state := fsm.State() cfg := state.Config() - cfg.EnableDurability = true - // DurableCount = 4 each mock events wrapper contains 2 events - cfg.DurableCount = 4 + // DurableEventCount = 4 each mock events wrapper contains 2 events + cfg.DurableEventCount = 4 e1 := mock.Events(1000) e2 := mock.Events(1001) @@ -3251,7 +3250,7 @@ func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) { fsm := testFSM(t) state := fsm.State() cfg := state.Config() - cfg.EnableDurability = false + cfg.DurableEventCount = 0 e1 := mock.Events(1000) e2 := mock.Events(1001) diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index a8c48b1ff1f..6e4903f024d 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -42,8 +42,7 @@ type changeTrackerDB struct { // ChangeConfig type ChangeConfig struct { - DurableEvents bool - DurableCount int + DurableEventCount int } func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, changesFn changeProcessor, cfg *ChangeConfig) *changeTrackerDB { @@ -55,8 +54,7 @@ func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, chang db: db, publisher: publisher, processChanges: changesFn, - durableEvents: cfg.DurableEvents, - durableCount: cfg.DurableCount, + durableCount: cfg.DurableEventCount, } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3fd0a732166..08ae6655158 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -46,10 +46,12 @@ type StateStoreConfig struct { // Region is the region of the server embedding the state store. Region string + // EnablePublisher is used to enable or disable the event publisher EnablePublisher bool - EnableDurability bool - DurableCount int + // DurableEventCount is the amount of events to persist during the snapshot + // process. + DurableEventCount int } // The StateStore is responsible for maintaining all the Nomad @@ -94,8 +96,7 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { if config.EnablePublisher { cfg := &ChangeConfig{ - DurableEvents: config.EnableDurability, - DurableCount: 1000, + DurableEventCount: 1000, } publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{ EventBufferTTL: 1 * time.Hour, diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 86acaf9e782..19421cf4231 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -26,10 +26,10 @@ func TestStateStore(t testing.T) *StateStore { func TestStateStorePublisher(t testing.T) *StateStoreConfig { return &StateStoreConfig{ - Logger: testlog.HCLogger(t), - Region: "global", - EnablePublisher: true, - EnableDurability: true, + Logger: testlog.HCLogger(t), + Region: "global", + EnablePublisher: true, + DurableEventCount: 100, } } func TestStateStoreCfg(t testing.T, cfg *StateStoreConfig) *StateStore {