Skip to content

Commit

Permalink
Raft Debugging Improvements (#11414)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahmood Ali authored Nov 4, 2021
1 parent 8662dd8 commit 68bae12
Show file tree
Hide file tree
Showing 8 changed files with 400 additions and 56 deletions.
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"operator snapshot _state": func() (cli.Command, error) {
return &OperatorSnapshotStateCommand{
Meta: meta,
}, nil
},
"operator snapshot restore": func() (cli.Command, error) {
return &OperatorSnapshotRestoreCommand{
Meta: meta,
Expand Down
10 changes: 9 additions & 1 deletion command/operator_raft_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,20 @@ func (c *OperatorRaftStateCommand) Run(args []string) int {
return 1
}

state, err := raftutil.FSMState(raftPath, fLastIdx)
fsm, err := raftutil.NewFSM(raftPath)
if err != nil {
c.Ui.Error(err.Error())
return 1
}
defer fsm.Close()

_, _, err = fsm.ApplyAll()
if err != nil {
c.Ui.Error(err.Error())
return 1
}

state := fsm.StateAsMap()
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(state); err != nil {
Expand Down
77 changes: 77 additions & 0 deletions command/operator_snapshot_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package command

import (
"encoding/json"
"fmt"
"os"
"strings"

"github.com/hashicorp/nomad/helper/raftutil"
"github.com/posener/complete"
)

type OperatorSnapshotStateCommand struct {
Meta
}

func (c *OperatorSnapshotStateCommand) Help() string {
helpText := `
Usage: nomad operator snapshot _state <file>
Displays a JSON representation of state in the snapshot.
To inspect the file "backup.snap":
$ nomad operator snapshot _state backup.snap
`
return strings.TrimSpace(helpText)
}

func (c *OperatorSnapshotStateCommand) AutocompleteFlags() complete.Flags {
return complete.Flags{}
}

func (c *OperatorSnapshotStateCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}

func (c *OperatorSnapshotStateCommand) Synopsis() string {
return "Displays information about a Nomad snapshot file"
}

func (c *OperatorSnapshotStateCommand) Name() string { return "operator snapshot _state" }

func (c *OperatorSnapshotStateCommand) Run(args []string) int {
// Check that we either got no filename or exactly one.
if len(args) != 1 {
c.Ui.Error("This command takes one argument: <file>")
c.Ui.Error(commandErrorText(c))
return 1
}

path := args[0]
f, err := os.Open(path)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error opening snapshot file: %s", err))
return 1
}
defer f.Close()

state, meta, err := raftutil.RestoreFromArchive(f)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to read archive file: %s", err))
return 1
}

sm := raftutil.StateAsMap(state)
sm["SnapshotMeta"] = []interface{}{meta}

enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(sm); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to encode output: %v", err))
return 1
}

return 0
}
204 changes: 150 additions & 54 deletions helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,84 @@ package raftutil

import (
"fmt"
"os"
"io"
"path/filepath"
"strings"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
)

// FSMState returns a dump of the FSM state as found in data-dir, as of lastIndx value
func FSMState(p string, plastIdx int64) (interface{}, error) {
var ErrNoMoreLogs = fmt.Errorf("no more logs")

type nomadFSM interface {
raft.FSM
State() *state.StateStore
Restore(io.ReadCloser) error
}

type FSMHelper struct {
path string

logger hclog.Logger

// nomad state
store *raftboltdb.BoltStore
fsm nomadFSM
snaps *raft.FileSnapshotStore

// raft
logFirstIdx uint64
logLastIdx uint64
nextIdx uint64
}

func NewFSM(p string) (*FSMHelper, error) {
store, firstIdx, lastIdx, err := RaftStateInfo(filepath.Join(p, "raft.db"))
if err != nil {
return nil, fmt.Errorf("failed to open raft database %v: %v", p, err)
}
defer store.Close()

snaps, err := raft.NewFileSnapshotStore(p, 1000, os.Stderr)
logger := hclog.L()

snaps, err := raft.NewFileSnapshotStoreWithLogger(p, 1000, logger)
if err != nil {
store.Close()
return nil, fmt.Errorf("failed to open snapshot dir: %v", err)
}

logger := hclog.L()
fsm, err := dummyFSM(logger)
if err != nil {
store.Close()
return nil, err
}

return &FSMHelper{
path: p,
logger: logger,
store: store,
fsm: fsm,
snaps: snaps,

logFirstIdx: firstIdx,
logLastIdx: lastIdx,
nextIdx: uint64(1),
}, nil
}

func dummyFSM(logger hclog.Logger) (nomadFSM, error) {
// use dummy non-enabled FSM dependencies
periodicDispatch := nomad.NewPeriodicDispatch(logger, nil)
blockedEvals := nomad.NewBlockedEvals(nil, logger)
evalBroker, err := nomad.NewEvalBroker(1, 1, 1, 1)
if err != nil {
return nil, err
}

fsmConfig := &nomad.FSMConfig{
EvalBroker: evalBroker,
Periodic: periodicDispatch,
Expand All @@ -41,38 +88,104 @@ func FSMState(p string, plastIdx int64) (interface{}, error) {
Region: "default",
}

fsm, err := nomad.NewFSM(fsmConfig)
if err != nil {
return nil, err
return nomad.NewFSM(fsmConfig)
}

func (f *FSMHelper) Close() {
f.store.Close()

}

func (f *FSMHelper) ApplyNext() (index uint64, term uint64, err error) {
if f.nextIdx == 1 {
// check snapshots first
index, term, err := f.restoreFromSnapshot()
if err != nil {
return 0, 0, err
}

if index != 0 {
f.nextIdx = index + 1
return index, term, nil
}
}

if f.nextIdx < f.logFirstIdx {
return 0, 0, fmt.Errorf("missing logs [%v, %v]", f.nextIdx, f.logFirstIdx-1)
}

// restore from snapshot first
sFirstIdx, err := restoreFromSnapshot(fsm, snaps, logger)
if f.nextIdx > f.logLastIdx {
return 0, 0, ErrNoMoreLogs
}

var e raft.Log
err = f.store.GetLog(f.nextIdx, &e)
if err != nil {
return nil, err
return 0, 0, fmt.Errorf("failed to read log entry at index %d: %v", f.nextIdx, err)
}

if sFirstIdx+1 < firstIdx {
return nil, fmt.Errorf("missing logs after snapshot [%v,%v]", sFirstIdx+1, firstIdx-1)
} else if sFirstIdx > 0 {
firstIdx = sFirstIdx + 1
defer func() {
r := recover()
if r != nil && strings.HasPrefix(fmt.Sprint(r), "failed to apply request") {
// Enterprise specific log entries will fail to load in OSS repository with "failed to apply request."
// If not relevant to investigation, we can ignore them and simply worn.
f.logger.Warn("failed to apply log; loading Enterprise data-dir in OSS binary?", "index", e.Index)

f.nextIdx++
} else if r != nil {
panic(r)
}
}()

if e.Type == raft.LogCommand {
f.fsm.Apply(&e)
}

lastIdx = lastIndex(lastIdx, plastIdx)
f.nextIdx++
return e.Index, e.Term, nil
}

for i := firstIdx; i <= lastIdx; i++ {
var e raft.Log
err := store.GetLog(i, &e)
if err != nil {
return nil, fmt.Errorf("failed to read log entry at index %d: %v, firstIdx: %d, lastIdx: %d", i, err, firstIdx, lastIdx)
// ApplyUntil applies all raft entries until (inclusive) the passed index.
func (f *FSMHelper) ApplyUntil(stopIdx uint64) (idx uint64, term uint64, err error) {
var lastIdx, lastTerm uint64
for {
idx, term, err := f.ApplyNext()
if err == ErrNoMoreLogs {
return lastIdx, lastTerm, nil
} else if err != nil {
return lastIdx, lastTerm, err
} else if idx >= stopIdx {
return lastIdx, lastTerm, nil
}

if e.Type == raft.LogCommand {
fsm.Apply(&e)
lastIdx, lastTerm = idx, term
}
}

func (f *FSMHelper) ApplyAll() (index uint64, term uint64, err error) {
var lastIdx, lastTerm uint64
for {
idx, term, err := f.ApplyNext()
if err == ErrNoMoreLogs {
return lastIdx, lastTerm, nil
} else if err != nil {
return lastIdx, lastTerm, err
}

lastIdx, lastTerm = idx, term
}
}

state := fsm.State()
func (f *FSMHelper) State() *state.StateStore {
return f.fsm.State()
}

func (f *FSMHelper) StateAsMap() map[string][]interface{} {
return StateAsMap(f.fsm.State())
}

// StateAsMap returns a json-able representation of the state
func StateAsMap(state *state.StateStore) map[string][]interface{} {
result := map[string][]interface{}{
"ACLPolicies": toArray(state.ACLPolicies(nil)),
"ACLTokens": toArray(state.ACLTokens(nil)),
Expand All @@ -95,52 +208,35 @@ func FSMState(p string, plastIdx int64) (interface{}, error) {

insertEnterpriseState(result, state)

return result, nil
return result

}

func restoreFromSnapshot(fsm raft.FSM, snaps raft.SnapshotStore, logger hclog.Logger) (uint64, error) {
logger = logger.Named("restoreFromSnapshot")
snapshots, err := snaps.List()
func (f *FSMHelper) restoreFromSnapshot() (index uint64, term uint64, err error) {
snapshots, err := f.snaps.List()
if err != nil {
return 0, err
return 0, 0, err
}
logger.Debug("found snapshots", "count", len(snapshots))
f.logger.Debug("found snapshots", "count", len(snapshots))

for _, snapshot := range snapshots {
_, source, err := snaps.Open(snapshot.ID)
_, source, err := f.snaps.Open(snapshot.ID)
if err != nil {
logger.Warn("failed to open a snapshot", "snapshot_id", snapshot.ID, "error", err)
f.logger.Warn("failed to open a snapshot", "snapshot_id", snapshot.ID, "error", err)
continue
}

err = fsm.Restore(source)
err = f.fsm.Restore(source)
source.Close()
if err != nil {
logger.Warn("failed to restore a snapshot", "snapshot_id", snapshot.ID, "error", err)
f.logger.Warn("failed to restore a snapshot", "snapshot_id", snapshot.ID, "error", err)
continue
}

return snapshot.Index, nil
return snapshot.Index, snapshot.Term, nil
}

return 0, nil
}

func lastIndex(raftLastIdx uint64, cliLastIdx int64) uint64 {
switch {
case cliLastIdx < 0:
if raftLastIdx > uint64(-cliLastIdx) {
return raftLastIdx - uint64(-cliLastIdx)
} else {
return 0
}
case cliLastIdx == 0:
return raftLastIdx
case uint64(cliLastIdx) < raftLastIdx:
return uint64(cliLastIdx)
default:
return raftLastIdx
}
return 0, 0, nil
}

func toArray(iter memdb.ResultIterator, err error) []interface{} {
Expand Down
Loading

0 comments on commit 68bae12

Please sign in to comment.