Skip to content

Commit

Permalink
cli: stream raft logs to operator raft logs subcommand (#11684)
Browse files Browse the repository at this point in the history
The `nomad operator raft logs` command uses a raft helper that reads
in the logs from raft and serializes them to JSON. The previous
implementation returned the slice of all logs and then serializes the
entire object. Update the helper to stream the log entries and then
serialize them as newline-delimited JSON.
  • Loading branch information
tgross authored Dec 16, 2021
1 parent 03ea7d1 commit bd18a45
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 30 deletions.
53 changes: 43 additions & 10 deletions command/operator_raft_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ Usage: nomad operator raft logs <path to nomad data dir>
This is a low-level debugging tool and not subject to Nomad's usual backward
compatibility guarantees.
Raft Logs Options:
-pretty
By default this command outputs newline delimited JSON. If the -pretty flag
is passed, each entry will be pretty-printed.
`
return strings.TrimSpace(helpText)
}
Expand All @@ -47,10 +52,20 @@ func (c *OperatorRaftLogsCommand) Synopsis() string {
func (c *OperatorRaftLogsCommand) Name() string { return "operator raft logs" }

func (c *OperatorRaftLogsCommand) Run(args []string) int {
if len(args) != 1 {

var pretty bool
flagSet := c.Meta.FlagSet(c.Name(), FlagSetClient)
flagSet.Usage = func() { c.Ui.Output(c.Help()) }
flagSet.BoolVar(&pretty, "pretty", false, "")

if err := flagSet.Parse(args); err != nil {
return 1
}

args = flagSet.Args()
if l := len(args); l != 1 {
c.Ui.Error("This command takes one argument: <path>")
c.Ui.Error(commandErrorText(c))

return 1
}

Expand All @@ -60,21 +75,39 @@ func (c *OperatorRaftLogsCommand) Run(args []string) int {
return 1
}

logs, warnings, err := raftutil.LogEntries(raftPath)
enc := json.NewEncoder(os.Stdout)
if pretty {
enc.SetIndent("", " ")
}

logChan, warningsChan, err := raftutil.LogEntries(raftPath)
if err != nil {
c.Ui.Error(err.Error())
return 1
}

for _, warning := range warnings {
c.Ui.Error(warning.Error())
// so that the warnings don't end up mixed into the JSON stream,
// collect them and print them once we're done
warnings := []error{}

DONE:
for {
select {
case log := <-logChan:
if log == nil {
break DONE // no more logs, but break to print warnings
}
if err := enc.Encode(log); err != nil {
c.Ui.Error(fmt.Sprintf("failed to encode output: %v", err))
return 1
}
case warning := <-warningsChan:
warnings = append(warnings, warning)
}
}

enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(logs); err != nil {
c.Ui.Error(fmt.Sprintf("failed to encode output: %v", err))
return 1
for _, warning := range warnings {
c.Ui.Error(warning.Error())
}

return 0
Expand Down
51 changes: 31 additions & 20 deletions helper/raftutil/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,44 @@ func RaftStateInfo(p string) (store *raftboltdb.BoltStore, firstIdx uint64, last
return s, firstIdx, lastIdx, nil
}

// LogEntries returns the log entries as found in raft log in the passed data-dir directory
func LogEntries(p string) (logs []interface{}, warnings []error, err error) {
// LogEntries reads the raft logs found in the data directory found at
// the path `p`, and returns a channel of logs, and a channel of
// warnings. If opening the raft state returns an error, both channels
// will be nil.
func LogEntries(p string) (<-chan interface{}, <-chan error, error) {
store, firstIdx, lastIdx, err := RaftStateInfo(p)
if err != nil {
return nil, nil, fmt.Errorf("failed to open raft logs: %v", err)
}
defer store.Close()

result := make([]interface{}, 0, lastIdx-firstIdx+1)
for i := firstIdx; i <= lastIdx; i++ {
var e raft.Log
err := store.GetLog(i, &e)
if err != nil {
warnings = append(warnings, fmt.Errorf("failed to read log entry at index %d (firstIdx: %d, lastIdx: %d): %v", i, firstIdx, lastIdx, err))
continue
}

m, err := decode(&e)
if err != nil {
warnings = append(warnings, fmt.Errorf("failed to decode log entry at index %d: %v", i, err))
continue
entries := make(chan interface{})
warnings := make(chan error)

go func() {
defer store.Close()
defer close(entries)
for i := firstIdx; i <= lastIdx; i++ {
var e raft.Log
err := store.GetLog(i, &e)
if err != nil {
warnings <- fmt.Errorf(
"failed to read log entry at index %d (firstIdx: %d, lastIdx: %d): %v",
i, firstIdx, lastIdx, err)
continue
}

entry, err := decode(&e)
if err != nil {
warnings <- fmt.Errorf(
"failed to decode log entry at index %d: %v", i, err)
continue
}

entries <- entry
}
}()

result = append(result, m)
}

return result, warnings, nil
return entries, warnings, nil
}

type logMessage struct {
Expand Down

0 comments on commit bd18a45

Please sign in to comment.