Skip to content

Commit

Permalink
rehydrage event publisher on snapshot restore
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Oct 7, 2020
1 parent 0dd32f4 commit 65bcca6
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 24 deletions.
43 changes: 27 additions & 16 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1564,22 +1564,33 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {

// Rehydrate the new state store's event publisher with the events
// persisted in the snapshot
// pub, err := n.state.EventPublisher()
// if err != nil {
// n.logger.Warn("Snapshot Restore: new state event publisher not configured")
// }
// events, err := n.state.Events(nil)
// if err != nil {
// n.logger.Warn("Snapshot Restore: unable to retrieve current events")
// }
// for {
// raw := events.Next()
// if raw == nil {
// break
// }
// e := raw.(*structs.Events)
// pub.Publish(e)
// }
if n.config.EnableEventPublisher {
if err := rehydratePublisherFromState(n.state); err != nil {
n.logger.Error("Error re-hydrating event publisher during restore", "error", err)
}
}

return nil
}

func rehydratePublisherFromState(s *state.StateStore) error {
pub, err := s.EventPublisher()
if err != nil {
return err
}

events, err := s.Events(nil)
if err != nil {
return err
}
for {
raw := events.Next()
if raw == nil {
break
}
e := raw.(*structs.Events)
pub.Publish(*e)
}
return nil
}

Expand Down
28 changes: 21 additions & 7 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"bytes"
"errors"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -50,11 +51,12 @@ func testFSM(t *testing.T) *nomadFSM {
dispatcher, _ := testPeriodicDispatcher(t)
logger := testlog.HCLogger(t)
fsmConfig := &FSMConfig{
EvalBroker: broker,
Periodic: dispatcher,
Blocked: NewBlockedEvals(broker, logger),
Logger: logger,
Region: "global",
EvalBroker: broker,
Periodic: dispatcher,
Blocked: NewBlockedEvals(broker, logger),
Logger: logger,
Region: "global",
EnableEventPublisher: true,
}
fsm, err := NewFSM(fsmConfig)
if err != nil {
Expand Down Expand Up @@ -3204,6 +3206,8 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
fsm.config.EnableEventPublisher = true

state := fsm.State()
cfg := state.Config()
// DurableEventCount = 4 each mock events wrapper contains 2 events
Expand Down Expand Up @@ -3242,15 +3246,25 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) {
// Durable count was 4 so e1 events should be excluded
raw1 := iter.Next()
require.Nil(t, raw1)

pub, err := state2.EventPublisher()
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
if pub.Len() == 2 {
return true, nil
}
return false, errors.New("expected publisher to be populated")
}, func(err error) {
require.Fail(t, err.Error())
})
}

func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
fsm.config.EnableEventPublisher = false
state := fsm.State()
cfg := state.Config()
cfg.DurableEventCount = 0

e1 := mock.Events(1000)
e2 := mock.Events(1001)
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/node_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestNodeDrainEventFromChanges(t *testing.T) {
got, err := processDBChanges(tx, changes)
require.NoError(t, err)

require.Len(t, got, 1)
require.Len(t, got.Events, 1)

require.Equal(t, TopicNode, got.Events[0].Topic)
require.Equal(t, TypeNodeDrain, got.Events[0].Type)
Expand Down
4 changes: 4 additions & 0 deletions nomad/stream/event_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish
return e
}

func (e *EventPublisher) Len() int {
return e.eventBuf.Len()
}

// Publish events to all subscribers of the event Topic.
func (e *EventPublisher) Publish(events structs.Events) {
if len(events.Events) > 0 {
Expand Down

0 comments on commit 65bcca6

Please sign in to comment.