From e73511768fb2778cfdf81525655d51ccd86adcd1 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 15 Dec 2021 15:17:47 -0500 Subject: [PATCH] cli: stream raft logs to operator raft logs subcommand The `nomad operator raft logs` command uses a raft helper that reads in the logs from raft and serializes them to JSON. The previous implementation returned the slice of all logs and then serializes the entire object. Update the helper to stream the log entries and then serialize them as newline-delimited JSON. --- command/operator_raft_logs.go | 53 ++++++++++++++++++++++++++++------- helper/raftutil/state.go | 51 ++++++++++++++++++++------------- 2 files changed, 74 insertions(+), 30 deletions(-) diff --git a/command/operator_raft_logs.go b/command/operator_raft_logs.go index fe3233ee6c9..006827aa659 100644 --- a/command/operator_raft_logs.go +++ b/command/operator_raft_logs.go @@ -28,6 +28,11 @@ Usage: nomad operator raft logs This is a low-level debugging tool and not subject to Nomad's usual backward compatibility guarantees. +Raft Logs Options: + + -pretty + By default this command outputs newline delimited JSON. If the -pretty flag + is passed, each entry will be pretty-printed. ` return strings.TrimSpace(helpText) } @@ -47,10 +52,20 @@ func (c *OperatorRaftLogsCommand) Synopsis() string { func (c *OperatorRaftLogsCommand) Name() string { return "operator raft logs" } func (c *OperatorRaftLogsCommand) Run(args []string) int { - if len(args) != 1 { + + var pretty bool + flagSet := c.Meta.FlagSet(c.Name(), FlagSetClient) + flagSet.Usage = func() { c.Ui.Output(c.Help()) } + flagSet.BoolVar(&pretty, "pretty", false, "") + + if err := flagSet.Parse(args); err != nil { + return 1 + } + + args = flagSet.Args() + if l := len(args); l != 1 { c.Ui.Error("This command takes one argument: ") c.Ui.Error(commandErrorText(c)) - return 1 } @@ -60,21 +75,39 @@ func (c *OperatorRaftLogsCommand) Run(args []string) int { return 1 } - logs, warnings, err := raftutil.LogEntries(raftPath) + enc := json.NewEncoder(os.Stdout) + if pretty { + enc.SetIndent("", " ") + } + + logChan, warningsChan, err := raftutil.LogEntries(raftPath) if err != nil { c.Ui.Error(err.Error()) return 1 } - for _, warning := range warnings { - c.Ui.Error(warning.Error()) + // so that the warnings don't end up mixed into the JSON stream, + // collect them and print them once we're done + warnings := []error{} + +DONE: + for { + select { + case log := <-logChan: + if log == nil { + break DONE // no more logs, but break to print warnings + } + if err := enc.Encode(log); err != nil { + c.Ui.Error(fmt.Sprintf("failed to encode output: %v", err)) + return 1 + } + case warning := <-warningsChan: + warnings = append(warnings, warning) + } } - enc := json.NewEncoder(os.Stdout) - enc.SetIndent("", " ") - if err := enc.Encode(logs); err != nil { - c.Ui.Error(fmt.Sprintf("failed to encode output: %v", err)) - return 1 + for _, warning := range warnings { + c.Ui.Error(warning.Error()) } return 0 diff --git a/helper/raftutil/state.go b/helper/raftutil/state.go index 1206fbdba46..f902a32efba 100644 --- a/helper/raftutil/state.go +++ b/helper/raftutil/state.go @@ -32,33 +32,44 @@ func RaftStateInfo(p string) (store *raftboltdb.BoltStore, firstIdx uint64, last return s, firstIdx, lastIdx, nil } -// LogEntries returns the log entries as found in raft log in the passed data-dir directory -func LogEntries(p string) (logs []interface{}, warnings []error, err error) { +// LogEntries reads the raft logs found in the data directory found at +// the path `p`, and returns a channel of logs, and a channel of +// warnings. If opening the raft state returns an error, both channels +// will be nil. +func LogEntries(p string) (<-chan interface{}, <-chan error, error) { store, firstIdx, lastIdx, err := RaftStateInfo(p) if err != nil { return nil, nil, fmt.Errorf("failed to open raft logs: %v", err) } - defer store.Close() - result := make([]interface{}, 0, lastIdx-firstIdx+1) - for i := firstIdx; i <= lastIdx; i++ { - var e raft.Log - err := store.GetLog(i, &e) - if err != nil { - warnings = append(warnings, fmt.Errorf("failed to read log entry at index %d (firstIdx: %d, lastIdx: %d): %v", i, firstIdx, lastIdx, err)) - continue - } - - m, err := decode(&e) - if err != nil { - warnings = append(warnings, fmt.Errorf("failed to decode log entry at index %d: %v", i, err)) - continue + entries := make(chan interface{}) + warnings := make(chan error) + + go func() { + defer store.Close() + defer close(entries) + for i := firstIdx; i <= lastIdx; i++ { + var e raft.Log + err := store.GetLog(i, &e) + if err != nil { + warnings <- fmt.Errorf( + "failed to read log entry at index %d (firstIdx: %d, lastIdx: %d): %v", + i, firstIdx, lastIdx, err) + continue + } + + entry, err := decode(&e) + if err != nil { + warnings <- fmt.Errorf( + "failed to decode log entry at index %d: %v", i, err) + continue + } + + entries <- entry } + }() - result = append(result, m) - } - - return result, warnings, nil + return entries, warnings, nil } type logMessage struct {