Skip to content

Commit

Permalink
cli: Add event stream capture to nomad operator debug (#11865)
Browse files Browse the repository at this point in the history
  • Loading branch information
davemay99 authored Jan 18, 2022
1 parent dc81f26 commit 8d28bfe
Show file tree
Hide file tree
Showing 4 changed files with 432 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .changelog/11865.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
cli: Add event stream capture to `nomad operator debug`
```
231 changes: 224 additions & 7 deletions command/operator_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
"html/template"
Expand All @@ -21,6 +22,7 @@ import (
"time"

"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/hashicorp/nomad/helper"
Expand All @@ -42,12 +44,15 @@ type OperatorDebugCommand struct {
nodeClass string
nodeIDs []string
serverIDs []string
topics map[api.Topic][]string
index uint64
consul *external
vault *external
manifest []string
ctx context.Context
cancel context.CancelFunc
opts *api.QueryOptions
verbose bool
}

const (
Expand All @@ -73,6 +78,11 @@ Usage: nomad operator debug [options]
token will also require 'agent:write', or enable_debug configuration set to
true.
If event stream capture is enabled, the Job, Allocation, Deployment,
and Evaluation topics require 'namespace:read-job' capabilities, the Node
topic requires 'node:read'. A 'management' token is required to capture
ACLToken, ACLPolicy, or all all events.
General Options:
` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + `
Expand Down Expand Up @@ -137,7 +147,20 @@ Debug Options:
-duration=<duration>
Set the duration of the debug capture. Logs will be captured from specified servers and
nodes at "log-level". Defaults to 2m.
nodes at "log-level". Defaults to 2m.
-event-index=<index>
Specifies the index to start streaming events from. If the requested index is
no longer in the buffer the stream will start at the next available index.
Defaults to 0.
-event-topic=<Allocation,Evaluation,Job,Node,*>:<filter>
Enable event stream capture, filtered by comma delimited list of topic filters.
Examples:
"all" or "*:*" for all events
"Evaluation" or "Evaluation:*" for all evaluation events
"*:example" for all events related to the job "example"
Defaults to "none" (disabled).
-interval=<interval>
The interval between snapshots of the Nomad state. Set interval equal to
Expand Down Expand Up @@ -173,7 +196,10 @@ Debug Options:
-output=<path>
Path to the parent directory of the output directory. If specified, no
archive is built. Defaults to the current directory.
archive is built. Defaults to the current directory.
-verbose
Enable verbose output.
`
return strings.TrimSpace(helpText)
}
Expand All @@ -186,6 +212,8 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-duration": complete.PredictAnything,
"-event-index": complete.PredictAnything,
"-event-topic": complete.PredictAnything,
"-interval": complete.PredictAnything,
"-log-level": complete.PredictSet("TRACE", "DEBUG", "INFO", "WARN", "ERROR"),
"-max-nodes": complete.PredictAnything,
Expand All @@ -196,6 +224,7 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags {
"-pprof-duration": complete.PredictAnything,
"-consul-token": complete.PredictAnything,
"-vault-token": complete.PredictAnything,
"-verbose": complete.PredictAnything,
})
}

Expand Down Expand Up @@ -225,7 +254,7 @@ func NodePredictor(factory ApiClientFactory) complete.Predictor {
}

// NodeClassPredictor returns a client node class predictor
// TODO: Consider API options for node class filtering
// TODO dmay: Consider API options for node class filtering
func NodeClassPredictor(factory ApiClientFactory) complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := factory()
Expand Down Expand Up @@ -261,7 +290,7 @@ func NodeClassPredictor(factory ApiClientFactory) complete.Predictor {
}

// ServerPredictor returns a server member predictor
// TODO: Consider API options for server member filtering
// TODO dmay: Consider API options for server member filtering
func ServerPredictor(factory ApiClientFactory) complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := factory()
Expand Down Expand Up @@ -305,11 +334,14 @@ func (c *OperatorDebugCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }

var duration, interval, output, pprofDuration string
var duration, interval, output, pprofDuration, eventTopic string
var eventIndex int64
var nodeIDs, serverIDs string
var allowStale bool

flags.StringVar(&duration, "duration", "2m", "")
flags.Int64Var(&eventIndex, "event-index", 0, "")
flags.StringVar(&eventTopic, "event-topic", "none", "")
flags.StringVar(&interval, "interval", "30s", "")
flags.StringVar(&c.logLevel, "log-level", "DEBUG", "")
flags.IntVar(&c.maxNodes, "max-nodes", 10, "")
Expand All @@ -319,6 +351,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
flags.BoolVar(&allowStale, "stale", false, "")
flags.StringVar(&output, "output", "", "")
flags.StringVar(&pprofDuration, "pprof-duration", "1s", "")
flags.BoolVar(&c.verbose, "verbose", false, "")

c.consul = &external{tls: &api.TLSConfig{}}
flags.StringVar(&c.consul.addrVal, "consul-http-addr", os.Getenv("CONSUL_HTTP_ADDR"), "")
Expand Down Expand Up @@ -375,6 +408,21 @@ func (c *OperatorDebugCommand) Run(args []string) int {
}
c.pprofDuration = pd

// Parse event stream topic filter
t, err := topicsFromString(eventTopic)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing event topics: %v", err))
return 1
}
c.topics = t

// Validate and set initial event stream index
if eventIndex < 0 {
c.Ui.Error("Event stream index must be greater than zero")
return 1
}
c.index = uint64(eventIndex)

// Verify there are no extra arguments
args = flags.Args()
if l := len(args); l != 0 {
Expand Down Expand Up @@ -550,6 +598,9 @@ func (c *OperatorDebugCommand) Run(args []string) int {
if c.pprofDuration.Seconds() != 1 {
c.Ui.Output(fmt.Sprintf(" pprof Duration: %s", c.pprofDuration))
}
if c.topics != nil {
c.Ui.Output(fmt.Sprintf(" Event topics: %+v", c.topics))
}
c.Ui.Output("")
c.Ui.Output("Capturing cluster data...")

Expand Down Expand Up @@ -584,8 +635,11 @@ func (c *OperatorDebugCommand) Run(args []string) int {

// collect collects data from our endpoints and writes the archive bundle
func (c *OperatorDebugCommand) collect(client *api.Client) error {
// Collect cluster data
// Start background captures
c.startMonitors(client)
c.startEventStream(client)

// Collect cluster data
self, err := client.Agent().Self()
c.writeJSON(clusterDir, "agent-self.json", self, err)

Expand All @@ -611,7 +665,6 @@ func (c *OperatorDebugCommand) collect(client *api.Client) error {
c.collectAgentHosts(client)
c.collectPprofs(client)

c.startMonitors(client)
c.collectPeriodic(client)

return nil
Expand Down Expand Up @@ -686,6 +739,103 @@ func (c *OperatorDebugCommand) startMonitor(path, idKey, nodeID string, client *
}
}

// captureEventStream wraps the event stream capture process.
func (c *OperatorDebugCommand) startEventStream(client *api.Client) {
c.verboseOut("Launching eventstream goroutine...")

go func() {
if err := c.captureEventStream(client); err != nil {
var es string
if mErr, ok := err.(*multierror.Error); ok {
es = multierror.ListFormatFunc(mErr.Errors)
} else {
es = err.Error()
}

c.Ui.Error(fmt.Sprintf("Error capturing event stream: %s", es))
}
}()
}

func (c *OperatorDebugCommand) captureEventStream(client *api.Client) error {
// Ensure output directory is present
path := clusterDir
if err := c.mkdir(c.path(path)); err != nil {
return err
}

// Create the output file
fh, err := os.Create(c.path(path, "eventstream.json"))
if err != nil {
return err
}
defer fh.Close()

// Get handle to events endpoint
events := client.EventStream()

// Start streaming events
eventCh, err := events.Stream(c.ctx, c.topics, c.index, c.queryOpts())
if err != nil {
if errors.Is(err, context.Canceled) {
c.verboseOut("Event stream canceled: No events captured")
return nil
}
return fmt.Errorf("failed to stream events: %w", err)
}

eventCount := 0
errCount := 0
heartbeatCount := 0
channelEventCount := 0

var mErrs *multierror.Error

for {
select {
case event := <-eventCh:
channelEventCount++
if event.Err != nil {
errCount++
c.verboseOutf("error from event stream: index; %d err: %v", event.Index, event.Err)
mErrs = multierror.Append(mErrs, fmt.Errorf("error at index: %d, Err: %w", event.Index, event.Err))
break
}

if event.IsHeartbeat() {
heartbeatCount++
continue
}

for _, e := range event.Events {
eventCount++
c.verboseOutf("Event: %4d, Index: %d, Topic: %-10s, Type: %s, FilterKeys: %s", eventCount, e.Index, e.Topic, e.Type, e.FilterKeys)

bytes, err := json.Marshal(e)
if err != nil {
errCount++
mErrs = multierror.Append(mErrs, fmt.Errorf("failed to marshal json from Topic: %s, Type: %s, Err: %w", e.Topic, e.Type, err))
}

n, err := fh.Write(bytes)
if err != nil {
errCount++
mErrs = multierror.Append(mErrs, fmt.Errorf("failed to write bytes to eventstream.json; bytes written: %d, Err: %w", n, err))
break
}
n, err = fh.WriteString("\n")
if err != nil {
errCount++
mErrs = multierror.Append(mErrs, fmt.Errorf("failed to write string to eventstream.json; chars written: %d, Err: %w", n, err))
}
}
case <-c.ctx.Done():
c.verboseOutf("Event stream captured %d events, %d frames, %d heartbeats, %d errors", eventCount, channelEventCount, heartbeatCount, errCount)
return mErrs.ErrorOrNil()
}
}
}

// collectAgentHosts calls collectAgentHost for each selected node
func (c *OperatorDebugCommand) collectAgentHosts(client *api.Client) {
for _, n := range c.nodeIDs {
Expand Down Expand Up @@ -1192,6 +1342,16 @@ func (c *OperatorDebugCommand) trap() {
}()
}

func (c *OperatorDebugCommand) verboseOut(out string) {
if c.verbose {
c.Ui.Output(out)
}
}

func (c *OperatorDebugCommand) verboseOutf(format string, a ...interface{}) {
c.verboseOut(fmt.Sprintf(format, a...))
}

// TarCZF like the tar command, recursively builds a gzip compressed tar
// archive from a directory. If not empty, all files in the bundle are prefixed
// with the target path.
Expand Down Expand Up @@ -1312,6 +1472,63 @@ func stringToSlice(input string) []string {
return out
}

func parseEventTopics(topicList []string) (map[api.Topic][]string, error) {
topics := make(map[api.Topic][]string)

var mErrs *multierror.Error

for _, topic := range topicList {
k, v, err := parseTopic(topic)
if err != nil {
mErrs = multierror.Append(mErrs, err)
}

topics[api.Topic(k)] = append(topics[api.Topic(k)], v)
}

return topics, mErrs.ErrorOrNil()
}

func parseTopic(input string) (string, string, error) {
var topic, filter string

parts := strings.Split(input, ":")
switch len(parts) {
case 1:
// infer wildcard if only given a topic
topic = input
filter = "*"
case 2:
topic = parts[0]
filter = parts[1]
default:
return "", "", fmt.Errorf("Invalid key value pair for topic: %s", topic)
}

return strings.Title(topic), filter, nil
}

func allTopics() map[api.Topic][]string {
return map[api.Topic][]string{"*": {"*"}}
}

// topicsFromString parses a comma separated list into a topicMap
func topicsFromString(topicList string) (map[api.Topic][]string, error) {
if topicList == "none" {
return nil, nil
}
if topicList == "all" {
return allTopics(), nil
}

topics := stringToSlice(topicList)
topicMap, err := parseEventTopics(topics)
if err != nil {
return nil, err
}
return topicMap, nil
}

// external holds address configuration for Consul and Vault APIs
type external struct {
tls *api.TLSConfig
Expand Down
Loading

0 comments on commit 8d28bfe

Please sign in to comment.