From b23932d50bd19a886c3a50e6db196f69b05588f3 Mon Sep 17 00:00:00 2001 From: davemay99 Date: Tue, 26 Oct 2021 13:03:18 -0400 Subject: [PATCH 01/13] Add Event Stream capture to nomad operator debug --- command/operator_debug.go | 198 ++++++++++++++++++++++++++++++++- command/operator_debug_test.go | 189 +++++++++++++++++++++++++++++++ 2 files changed, 381 insertions(+), 6 deletions(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index 903cde95aa3..e98dc0a4a13 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -6,6 +6,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "flag" "fmt" "html/template" @@ -21,6 +22,7 @@ import ( "time" "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api/contexts" "github.com/hashicorp/nomad/helper" @@ -42,12 +44,14 @@ type OperatorDebugCommand struct { nodeClass string nodeIDs []string serverIDs []string + topics map[api.Topic][]string consul *external vault *external manifest []string ctx context.Context cancel context.CancelFunc opts *api.QueryOptions + verbose bool } const ( @@ -72,6 +76,11 @@ Usage: nomad operator debug [options] 'list-jobs' capability for all namespaces. To collect pprof profiles the token will also require 'agent:write', or enable_debug configuration set to true. + + If event stream capture is enabled, the Job, Allocation, Deployment, + and Evaluation topics require 'namespace:read-job' capabilities, the Node + topic requires 'node:read'. A 'management' token is required to capture + ACLToken, ACLPolicy, or all all events. General Options: @@ -137,7 +146,7 @@ Debug Options: -duration= Set the duration of the debug capture. Logs will be captured from specified servers and - nodes at "log-level". Defaults to 2m. + nodes at "log-level". Defaults to 2m. -interval= The interval between snapshots of the Nomad state. Set interval equal to @@ -172,8 +181,15 @@ Debug Options: necessary to get the configuration from a non-leader server. -output= - Path to the parent directory of the output directory. If specified, no - archive is built. Defaults to the current directory. + Path to the parent directory of the output directory. If specified, no + archive is built. Defaults to the current directory. + + -event-topic=: + Enable event stream capture. Filter by comma delimited list of topic filters + or "all". Defaults to "none" (disabled). + + -verbose + Enable verbose output ` return strings.TrimSpace(helpText) } @@ -196,6 +212,8 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags { "-pprof-duration": complete.PredictAnything, "-consul-token": complete.PredictAnything, "-vault-token": complete.PredictAnything, + "-event-topic": complete.PredictAnything, + "-verbose": complete.PredictAnything, }) } @@ -305,7 +323,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } - var duration, interval, output, pprofDuration string + var duration, interval, output, pprofDuration, eventTopic string var nodeIDs, serverIDs string var allowStale bool @@ -319,6 +337,8 @@ func (c *OperatorDebugCommand) Run(args []string) int { flags.BoolVar(&allowStale, "stale", false, "") flags.StringVar(&output, "output", "", "") flags.StringVar(&pprofDuration, "pprof-duration", "1s", "") + flags.StringVar(&eventTopic, "event-topic", "none", "") + flags.BoolVar(&c.verbose, "verbose", false, "") c.consul = &external{tls: &api.TLSConfig{}} flags.StringVar(&c.consul.addrVal, "consul-http-addr", os.Getenv("CONSUL_HTTP_ADDR"), "") @@ -375,6 +395,14 @@ func (c *OperatorDebugCommand) Run(args []string) int { } c.pprofDuration = pd + // Parse event stream topic filter + t, err := topicsFromString(eventTopic) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing event topics: %v", err)) + return 1 + } + c.topics = t + // Verify there are no extra arguments args = flags.Args() if l := len(args); l != 0 { @@ -550,6 +578,9 @@ func (c *OperatorDebugCommand) Run(args []string) int { if c.pprofDuration.Seconds() != 1 { c.Ui.Output(fmt.Sprintf(" pprof Duration: %s", c.pprofDuration)) } + if c.topics != nil { + c.Ui.Output(fmt.Sprintf(" Event topics: %+v", c.topics)) + } c.Ui.Output("") c.Ui.Output("Capturing cluster data...") @@ -584,8 +615,11 @@ func (c *OperatorDebugCommand) Run(args []string) int { // collect collects data from our endpoints and writes the archive bundle func (c *OperatorDebugCommand) collect(client *api.Client) error { - // Collect cluster data + // Start background captures + c.startMonitors(client) + c.startEventStream(clusterDir, c.topics, 0, client) // TODO: allow index as cmdline arg + // Collect cluster data self, err := client.Agent().Self() c.writeJSON(clusterDir, "agent-self.json", self, err) @@ -611,7 +645,6 @@ func (c *OperatorDebugCommand) collect(client *api.Client) error { c.collectAgentHosts(client) c.collectPprofs(client) - c.startMonitors(client) c.collectPeriodic(client) return nil @@ -686,6 +719,102 @@ func (c *OperatorDebugCommand) startMonitor(path, idKey, nodeID string, client * } } +// captureEventStream wraps the event stream capture process. +func (c *OperatorDebugCommand) startEventStream(path string, topics map[api.Topic][]string, index uint64, client *api.Client) { + c.verboseOut("Launching eventstream goroutine...") + + go func() { + if err := c.captureEventStream(path, topics, index, client); err != nil { + var es string + if mErr, ok := err.(*multierror.Error); ok { + es = multierror.ListFormatFunc(mErr.Errors) + } else { + es = err.Error() + } + + c.Ui.Error(fmt.Sprintf("Error capturing event stream: %s", es)) + } + }() +} + +func (c *OperatorDebugCommand) captureEventStream(path string, topicMap map[api.Topic][]string, index uint64, client *api.Client) error { + // Ensure output directory is present + if err := c.mkdir(c.path(path)); err != nil { + return err + } + + // Create the output file + fh, err := os.Create(c.path(path, "eventstream.json")) + if err != nil { + return err + } + defer fh.Close() + + // Get handle to events endpoint + events := client.EventStream() + + // Start streaming events + eventCh, err := events.Stream(c.ctx, topicMap, index, c.queryOpts()) + if err != nil { + if errors.Is(err, context.Canceled) { + c.verboseOut("Event stream canceled: No events captured") + return nil + } + return fmt.Errorf("failed to stream events: %w", err) + } + + eventCount := 0 + errCount := 0 + heartbeatCount := 0 + channelEventCount := 0 + + var mErrs *multierror.Error + + for { + select { + case event := <-eventCh: + channelEventCount++ + if event.Err != nil { + errCount++ + c.verboseOutf("error from event stream: index; %d err: %v", event.Index, event.Err) + mErrs = multierror.Append(mErrs, fmt.Errorf("error at index: %d, Err: %w", event.Index, event.Err)) + break + } + + if event.IsHeartbeat() { + heartbeatCount++ + continue + } + + for _, e := range event.Events { + eventCount++ + c.verboseOutf("Event: %4d, Index: %d, Topic: %-10s, Type: %s, FilterKeys: %s", eventCount, e.Index, e.Topic, e.Type, e.FilterKeys) + + bytes, err := json.Marshal(e) + if err != nil { + errCount++ + mErrs = multierror.Append(mErrs, fmt.Errorf("failed to marshal json from Topic: %s, Type: %s, Err: %w", e.Topic, e.Type, err)) + } + + n, err := fh.Write(bytes) + if err != nil { + errCount++ + mErrs = multierror.Append(mErrs, fmt.Errorf("failed to write bytes to eventstream.json; bytes written: %d, Err: %w", n, err)) + break + } + n, err = fh.WriteString("\n") + if err != nil { + errCount++ + mErrs = multierror.Append(mErrs, fmt.Errorf("failed to write string to eventstream.json; chars written: %d, Err: %w", n, err)) + } + } + case <-c.ctx.Done(): + c.verboseOutf("Event stream captured %d events, %d frames, %d heartbeats, %d errors", eventCount, channelEventCount, heartbeatCount, errCount) + return mErrs.ErrorOrNil() + } + } +} + // collectAgentHosts calls collectAgentHost for each selected node func (c *OperatorDebugCommand) collectAgentHosts(client *api.Client) { for _, n := range c.nodeIDs { @@ -1192,6 +1321,16 @@ func (c *OperatorDebugCommand) trap() { }() } +func (c *OperatorDebugCommand) verboseOut(out string) { + if c.verbose { + c.Ui.Output(out) + } +} + +func (c *OperatorDebugCommand) verboseOutf(format string, a ...interface{}) { + c.verboseOut(fmt.Sprintf(format, a...)) +} + // TarCZF like the tar command, recursively builds a gzip compressed tar // archive from a directory. If not empty, all files in the bundle are prefixed // with the target path. @@ -1312,6 +1451,53 @@ func stringToSlice(input string) []string { return out } +func parseEventTopics(topicList []string) (map[api.Topic][]string, error) { + topics := make(map[api.Topic][]string) + + for _, topic := range topicList { + k, v, err := parseTopic(topic) + if err != nil { + return nil, fmt.Errorf("error parsing topics: %w", err) + } + + topics[api.Topic(k)] = append(topics[api.Topic(k)], v) + } + + return topics, nil +} + +func parseTopic(topic string) (string, string, error) { + parts := strings.Split(topic, ":") + // infer wildcard if only given a topic + if len(parts) == 1 { + return topic, "*", nil + } else if len(parts) != 2 { + return "", "", fmt.Errorf("Invalid key value pair for topic, topic: %s", topic) + } + return parts[0], parts[1], nil +} + +func allTopics() map[api.Topic][]string { + return map[api.Topic][]string{"*": {"*"}} +} + +// topicsFromString parses a comma separated list into a topicMap +func topicsFromString(topicList string) (map[api.Topic][]string, error) { + if topicList == "none" { + return nil, nil + } + if topicList == "all" { + return allTopics(), nil + } + + topics := stringToSlice(topicList) + topicMap, err := parseEventTopics(topics) + if err != nil { + return nil, err + } + return topicMap, nil +} + // external holds address configuration for Consul and Vault APIs type external struct { tls *api.TLSConfig diff --git a/command/operator_debug_test.go b/command/operator_debug_test.go index 537ead9cff9..91aacf85491 100644 --- a/command/operator_debug_test.go +++ b/command/operator_debug_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "regexp" "strings" "testing" "time" @@ -73,6 +74,8 @@ func newClientAgentConfigFunc(region string, nodeClass string, srvRPCAddr string } func TestDebug_NodeClass(t *testing.T) { + t.Parallel() + // Start test server and API client srv, _, url := testServer(t, false, nil) @@ -121,6 +124,8 @@ func TestDebug_NodeClass(t *testing.T) { } func TestDebug_ClientToServer(t *testing.T) { + t.Parallel() + // Start test server and API client srv, _, url := testServer(t, false, nil) @@ -264,6 +269,8 @@ func TestDebug_MultiRegion(t *testing.T) { } func TestDebug_SingleServer(t *testing.T) { + t.Parallel() + srv, _, url := testServer(t, false, nil) testutil.WaitForLeader(t, srv.Agent.RPC) @@ -296,6 +303,8 @@ func TestDebug_SingleServer(t *testing.T) { } func TestDebug_Failures(t *testing.T) { + t.Parallel() + srv, _, url := testServer(t, false, nil) testutil.WaitForLeader(t, srv.Agent.RPC) @@ -330,6 +339,11 @@ func TestDebug_Failures(t *testing.T) { args: []string{"-duration", "5m", "-interval", "10m"}, expectedCode: 1, }, + { + name: "Fails bad pprof duration", + args: []string{"-pprof-duration", "baz"}, + expectedCode: 1, + }, { name: "Fails bad address", args: []string{"-address", url + "bogus"}, @@ -342,6 +356,8 @@ func TestDebug_Failures(t *testing.T) { } func TestDebug_Bad_CSIPlugin_Names(t *testing.T) { + t.Parallel() + // Start test server and API client srv, _, url := testServer(t, false, nil) @@ -391,6 +407,7 @@ func buildPathSlice(path string, files []string) []string { } func TestDebug_CapturedFiles(t *testing.T) { + // t.Parallel() srv, _, url := testServer(t, true, nil) testutil.WaitForLeader(t, srv.Agent.RPC) @@ -500,6 +517,8 @@ func TestDebug_CapturedFiles(t *testing.T) { } func TestDebug_ExistingOutput(t *testing.T) { + t.Parallel() + ui := cli.NewMockUi() cmd := &OperatorDebugCommand{Meta: Meta{Ui: ui}} @@ -515,6 +534,8 @@ func TestDebug_ExistingOutput(t *testing.T) { } func TestDebug_Fail_Pprof(t *testing.T) { + t.Parallel() + // Setup agent config with debug endpoints disabled agentConfFunc := func(c *agent.Config) { c.EnableDebug = false @@ -823,3 +844,171 @@ func testServerWithoutLeader(t *testing.T, runClient bool, cb func(*agent.Config c := a.Client() return a, c, a.HTTPAddr() } + +// testOutput is used to receive test output from a channel +type testOutput struct { + name string + code int + output string + error string +} + +func TestDebug_EventStream_TopicsFromString(t *testing.T) { + cases := []struct { + name string + topicList string + want map[api.Topic][]string + }{ + { + name: "topics = all", + topicList: "all", + want: allTopics(), + }, + { + name: "topics = none", + topicList: "none", + want: nil, + }, + { + name: "two topics", + topicList: "Deployment,Job", + want: map[api.Topic][]string{ + "Deployment": {"*"}, + "Job": {"*"}, + }, + }, + { + name: "multiple topics and filters (using api const)", + topicList: "Evaluation:example,Job:*,Node:*", + want: map[api.Topic][]string{ + api.TopicEvaluation: {"example"}, + api.TopicJob: {"*"}, + api.TopicNode: {"*"}, + }, + }, + { + name: "all topics for filterKey", + topicList: "*:example", + want: map[api.Topic][]string{ + "*": {"example"}, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got, err := topicsFromString(tc.topicList) + require.NoError(t, err) + require.Equal(t, tc.want, got) + }) + } +} + +func TestDebug_EventStream(t *testing.T) { + // TODO: specify output directory to allow inspection of eventstream.json + // TODO: require specific events in the eventstream.json file(s) + // TODO: scenario where no events are expected, verify "No events captured" + // TODO: verify event topic filtering only includes expected events + + var start time.Time + + // Start test server + srv, client, url := testServer(t, true, nil) + t.Logf("[TEST] %s: test server started, waiting for leadership to establish\n", time.Since(start)) + + // Ensure leader is ready + testutil.WaitForLeader(t, srv.Agent.RPC) + t.Logf("[TEST] %s: Leadership established\n", time.Since(start)) + + // Setup mock UI + ui := cli.NewMockUi() + cmd := &OperatorDebugCommand{Meta: Meta{Ui: ui}} + + // Create channels to pass info back from goroutine + chOutput := make(chan testOutput) + chDone := make(chan bool) + + // Set duration for capture + duration := 5 * time.Second + // Fail with timeout if duration is exceeded by 5 seconds + timeout := duration + 5*time.Second + + // Run debug in a goroutine so we can start the capture before we run the test job + t.Logf("[TEST] %s: Starting nomad operator debug in goroutine\n", time.Since(start)) + go func() { + code := cmd.Run([]string{"-address", url, "-duration", duration.String(), "-interval", "5s", "-event-topic", "Job:*"}) + assert.Equal(t, 0, code) + + chOutput <- testOutput{ + name: "yo", + code: code, + output: ui.OutputWriter.String(), + error: ui.ErrorWriter.String(), + } + chDone <- true + }() + + // Start test job + t.Logf("[TEST] %s: Running test job\n", time.Since(start)) + job := testJob("event_stream_test") + resp, _, err := client.Jobs().Register(job, nil) + t.Logf("[TEST] %s: Test job started\n", time.Since(start)) + + // Ensure job registered + require.NoError(t, err) + + // Wait for the job to complete + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + switch code { + case 1: + t.Fatalf("status code 1: All other failures (API connectivity, internal errors, etc)\n") + case 2: + t.Fatalf("status code 2: Problem scheduling job (impossible constraints, resources exhausted, etc)\n") + default: + t.Fatalf("status code non zero saw %d\n", code) + } + } + t.Logf("[TEST] %s: test job is complete, eval id: %s\n", time.Since(start), resp.EvalID) + + // Capture the output struct from nomad operator debug goroutine + var testOut testOutput + var done bool + for { + select { + case testOut = <-chOutput: + t.Logf("out from channel testout\n") + case done = <-chDone: + t.Logf("[TEST] %s: goroutine is complete", time.Since(start)) + case <-time.After(timeout): + t.Fatalf("timed out waiting for event stream event (duration: %s, timeout: %s", duration, timeout) + } + + if done { + break + } + } + + t.Logf("Values from struct -- code: %d, len(out): %d, len(outerr): %d\n", testOut.code, len(testOut.output), len(testOut.error)) + + require.Empty(t, testOut.error) + + archive := extractArchiveName(testOut.output) + require.NotEmpty(t, archive) + fmt.Println(archive) + + // TODO: verify evenstream.json output file contains expected content +} + +// extractArchiveName searches string s for the archive filename +func extractArchiveName(captureOutput string) string { + file := "" + + r := regexp.MustCompile(`Created debug archive: (.+)?\n`) + res := r.FindStringSubmatch(captureOutput) + // If found, there will be 2 elements, where element [1] is the desired text from the submatch + if len(res) == 2 { + file = res[1] + } + + return file +} From e0798772543ccb0f0f190278d49edb87447ce3cf Mon Sep 17 00:00:00 2001 From: Dave May Date: Sun, 16 Jan 2022 22:01:48 -0500 Subject: [PATCH 02/13] Add event stream documentation --- website/content/docs/commands/operator/debug.mdx | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/website/content/docs/commands/operator/debug.mdx b/website/content/docs/commands/operator/debug.mdx index 579b4c77b2e..d6507ee7221 100644 --- a/website/content/docs/commands/operator/debug.mdx +++ b/website/content/docs/commands/operator/debug.mdx @@ -75,6 +75,13 @@ true. leadership, it may be necessary to get the configuration from a non-leader server. +- `-event-topic=:`: Enable event + stream capture. Filter by comma delimited list of topic filters or "all". + Defaults to "none" (disabled). Refer to the [Events API](/api-docs/events) for + additional detail. + +- `-verbose`: Enable verbose output + - `-output=path`: Path to the parent directory of the output directory. Defaults to the current directory. If specified, no archive is built. From de009d4565f878c9ecfdc322267452feeb1db3cb Mon Sep 17 00:00:00 2001 From: Dave May Date: Sun, 16 Jan 2022 22:19:39 -0500 Subject: [PATCH 03/13] Add changelog entry --- .changelog/11865.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/11865.txt diff --git a/.changelog/11865.txt b/.changelog/11865.txt new file mode 100644 index 00000000000..a5e05cab47a --- /dev/null +++ b/.changelog/11865.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: Add event stream capture to `nomad operator debug` +``` \ No newline at end of file From 05bcf7d718ae57851d22ddc9a2d7d16e5e0fc618 Mon Sep 17 00:00:00 2001 From: Dave May Date: Mon, 17 Jan 2022 12:40:39 -0500 Subject: [PATCH 04/13] Update helptext --- command/operator_debug.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index e98dc0a4a13..4d1e16bc721 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -184,12 +184,16 @@ Debug Options: Path to the parent directory of the output directory. If specified, no archive is built. Defaults to the current directory. - -event-topic=: - Enable event stream capture. Filter by comma delimited list of topic filters - or "all". Defaults to "none" (disabled). + -event-topic=: + Enable event stream capture, filtered by comma delimited list of topic filters. + Examples: + "all" or "*:*" for all events + "Evaluation" or "Evaluation:*" for all evaluation events + "*:example" for all events related to the job "example" + Defaults to "none" (disabled). -verbose - Enable verbose output + Enable verbose output. ` return strings.TrimSpace(helpText) } From 1fd478fb835b89979059820ee0a039b7d9f1cf75 Mon Sep 17 00:00:00 2001 From: Dave May Date: Mon, 17 Jan 2022 12:43:56 -0500 Subject: [PATCH 05/13] Simplify function signatures --- command/operator_debug.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index 4d1e16bc721..b1baf50ee69 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -45,6 +45,7 @@ type OperatorDebugCommand struct { nodeIDs []string serverIDs []string topics map[api.Topic][]string + index uint64 consul *external vault *external manifest []string @@ -406,6 +407,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { return 1 } c.topics = t + c.index = 0 // Verify there are no extra arguments args = flags.Args() @@ -621,7 +623,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { func (c *OperatorDebugCommand) collect(client *api.Client) error { // Start background captures c.startMonitors(client) - c.startEventStream(clusterDir, c.topics, 0, client) // TODO: allow index as cmdline arg + c.startEventStream(client) // Collect cluster data self, err := client.Agent().Self() @@ -724,11 +726,11 @@ func (c *OperatorDebugCommand) startMonitor(path, idKey, nodeID string, client * } // captureEventStream wraps the event stream capture process. -func (c *OperatorDebugCommand) startEventStream(path string, topics map[api.Topic][]string, index uint64, client *api.Client) { +func (c *OperatorDebugCommand) startEventStream(client *api.Client) { c.verboseOut("Launching eventstream goroutine...") go func() { - if err := c.captureEventStream(path, topics, index, client); err != nil { + if err := c.captureEventStream(client); err != nil { var es string if mErr, ok := err.(*multierror.Error); ok { es = multierror.ListFormatFunc(mErr.Errors) @@ -741,8 +743,9 @@ func (c *OperatorDebugCommand) startEventStream(path string, topics map[api.Topi }() } -func (c *OperatorDebugCommand) captureEventStream(path string, topicMap map[api.Topic][]string, index uint64, client *api.Client) error { +func (c *OperatorDebugCommand) captureEventStream(client *api.Client) error { // Ensure output directory is present + path := clusterDir if err := c.mkdir(c.path(path)); err != nil { return err } @@ -758,7 +761,7 @@ func (c *OperatorDebugCommand) captureEventStream(path string, topicMap map[api. events := client.EventStream() // Start streaming events - eventCh, err := events.Stream(c.ctx, topicMap, index, c.queryOpts()) + eventCh, err := events.Stream(c.ctx, c.topics, c.index, c.queryOpts()) if err != nil { if errors.Is(err, context.Canceled) { c.verboseOut("Event stream canceled: No events captured") From ed99180538242eb263e5d68b25483e4a12cc6af1 Mon Sep 17 00:00:00 2001 From: Dave May Date: Mon, 17 Jan 2022 17:30:54 -0500 Subject: [PATCH 06/13] Tag myself on TODO items --- command/operator_debug.go | 4 ++-- command/operator_debug_test.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index b1baf50ee69..b020f0400d9 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -248,7 +248,7 @@ func NodePredictor(factory ApiClientFactory) complete.Predictor { } // NodeClassPredictor returns a client node class predictor -// TODO: Consider API options for node class filtering +// TODO dmay: Consider API options for node class filtering func NodeClassPredictor(factory ApiClientFactory) complete.Predictor { return complete.PredictFunc(func(a complete.Args) []string { client, err := factory() @@ -284,7 +284,7 @@ func NodeClassPredictor(factory ApiClientFactory) complete.Predictor { } // ServerPredictor returns a server member predictor -// TODO: Consider API options for server member filtering +// TODO dmay: Consider API options for server member filtering func ServerPredictor(factory ApiClientFactory) complete.Predictor { return complete.PredictFunc(func(a complete.Args) []string { client, err := factory() diff --git a/command/operator_debug_test.go b/command/operator_debug_test.go index 91aacf85491..04b8053f779 100644 --- a/command/operator_debug_test.go +++ b/command/operator_debug_test.go @@ -905,10 +905,10 @@ func TestDebug_EventStream_TopicsFromString(t *testing.T) { } func TestDebug_EventStream(t *testing.T) { - // TODO: specify output directory to allow inspection of eventstream.json - // TODO: require specific events in the eventstream.json file(s) - // TODO: scenario where no events are expected, verify "No events captured" - // TODO: verify event topic filtering only includes expected events + // TODO dmay: specify output directory to allow inspection of eventstream.json + // TODO dmay: require specific events in the eventstream.json file(s) + // TODO dmay: scenario where no events are expected, verify "No events captured" + // TODO dmay: verify event topic filtering only includes expected events var start time.Time @@ -996,7 +996,7 @@ func TestDebug_EventStream(t *testing.T) { require.NotEmpty(t, archive) fmt.Println(archive) - // TODO: verify evenstream.json output file contains expected content + // TODO dmay: verify evenstream.json output file contains expected content } // extractArchiveName searches string s for the archive filename From 29e926ef2f8b246712ba2ba237ab332ea4a86d8b Mon Sep 17 00:00:00 2001 From: Dave May Date: Mon, 17 Jan 2022 20:46:31 -0500 Subject: [PATCH 07/13] Return all topic errors at once --- command/operator_debug.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index b020f0400d9..e7c675baf56 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -1461,16 +1461,18 @@ func stringToSlice(input string) []string { func parseEventTopics(topicList []string) (map[api.Topic][]string, error) { topics := make(map[api.Topic][]string) + var mErrs *multierror.Error + for _, topic := range topicList { k, v, err := parseTopic(topic) if err != nil { - return nil, fmt.Errorf("error parsing topics: %w", err) + mErrs = multierror.Append(mErrs, fmt.Errorf("error parsing topics: %w", err)) } topics[api.Topic(k)] = append(topics[api.Topic(k)], v) } - return topics, nil + return topics, mErrs.ErrorOrNil() } func parseTopic(topic string) (string, string, error) { From f6bf499e0c3ef01d3fc57f4664ebce4a0e42ce66 Mon Sep 17 00:00:00 2001 From: Dave May Date: Mon, 17 Jan 2022 20:58:41 -0500 Subject: [PATCH 08/13] Add cli argument for event-index --- command/operator_debug.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index e7c675baf56..71a2a3e907e 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -149,6 +149,11 @@ Debug Options: Set the duration of the debug capture. Logs will be captured from specified servers and nodes at "log-level". Defaults to 2m. + -event-index= + Specifies the index to start streaming events from. If the requested index is + no longer in the buffer the stream will start at the next available index. + Defaults to 0. + -interval= The interval between snapshots of the Nomad state. Set interval equal to duration to capture a single snapshot. Defaults to 30s. @@ -207,6 +212,7 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ "-duration": complete.PredictAnything, + "-event-index": complete.PredictAnything, "-interval": complete.PredictAnything, "-log-level": complete.PredictSet("TRACE", "DEBUG", "INFO", "WARN", "ERROR"), "-max-nodes": complete.PredictAnything, @@ -329,10 +335,12 @@ func (c *OperatorDebugCommand) Run(args []string) int { flags.Usage = func() { c.Ui.Output(c.Help()) } var duration, interval, output, pprofDuration, eventTopic string + var eventIndex int64 var nodeIDs, serverIDs string var allowStale bool flags.StringVar(&duration, "duration", "2m", "") + flags.Int64Var(&eventIndex, "event-index", 0, "") flags.StringVar(&interval, "interval", "30s", "") flags.StringVar(&c.logLevel, "log-level", "DEBUG", "") flags.IntVar(&c.maxNodes, "max-nodes", 10, "") @@ -407,7 +415,13 @@ func (c *OperatorDebugCommand) Run(args []string) int { return 1 } c.topics = t - c.index = 0 + + // Validate and set initial event stream index + if eventIndex < 0 { + c.Ui.Error(fmt.Sprintf("Event stream index must be greater than zero")) + return 1 + } + c.index = uint64(eventIndex) // Verify there are no extra arguments args = flags.Args() From ef7c50b884bea97d8fbb16f9c9eeb27d187f4c8b Mon Sep 17 00:00:00 2001 From: Dave May Date: Mon, 17 Jan 2022 20:59:45 -0500 Subject: [PATCH 09/13] Cleanup event parsing errors --- command/operator_debug.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index 71a2a3e907e..3b3de8be70e 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -1480,7 +1480,7 @@ func parseEventTopics(topicList []string) (map[api.Topic][]string, error) { for _, topic := range topicList { k, v, err := parseTopic(topic) if err != nil { - mErrs = multierror.Append(mErrs, fmt.Errorf("error parsing topics: %w", err)) + mErrs = multierror.Append(mErrs, err) } topics[api.Topic(k)] = append(topics[api.Topic(k)], v) @@ -1495,7 +1495,7 @@ func parseTopic(topic string) (string, string, error) { if len(parts) == 1 { return topic, "*", nil } else if len(parts) != 2 { - return "", "", fmt.Errorf("Invalid key value pair for topic, topic: %s", topic) + return "", "", fmt.Errorf("Invalid key value pair for topic: %s", topic) } return parts[0], parts[1], nil } From d27e38105c4be8ee2938c585107150fa94716ad3 Mon Sep 17 00:00:00 2001 From: Dave May Date: Mon, 17 Jan 2022 21:00:59 -0500 Subject: [PATCH 10/13] Reorder event-topic --- command/operator_debug.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index 3b3de8be70e..b2104d86fc7 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -78,9 +78,9 @@ Usage: nomad operator debug [options] token will also require 'agent:write', or enable_debug configuration set to true. - If event stream capture is enabled, the Job, Allocation, Deployment, - and Evaluation topics require 'namespace:read-job' capabilities, the Node - topic requires 'node:read'. A 'management' token is required to capture + If event stream capture is enabled, the Job, Allocation, Deployment, + and Evaluation topics require 'namespace:read-job' capabilities, the Node + topic requires 'node:read'. A 'management' token is required to capture ACLToken, ACLPolicy, or all all events. General Options: @@ -154,6 +154,14 @@ Debug Options: no longer in the buffer the stream will start at the next available index. Defaults to 0. + -event-topic=: + Enable event stream capture, filtered by comma delimited list of topic filters. + Examples: + "all" or "*:*" for all events + "Evaluation" or "Evaluation:*" for all evaluation events + "*:example" for all events related to the job "example" + Defaults to "none" (disabled). + -interval= The interval between snapshots of the Nomad state. Set interval equal to duration to capture a single snapshot. Defaults to 30s. @@ -187,17 +195,9 @@ Debug Options: necessary to get the configuration from a non-leader server. -output= - Path to the parent directory of the output directory. If specified, no + Path to the parent directory of the output directory. If specified, no archive is built. Defaults to the current directory. - -event-topic=: - Enable event stream capture, filtered by comma delimited list of topic filters. - Examples: - "all" or "*:*" for all events - "Evaluation" or "Evaluation:*" for all evaluation events - "*:example" for all events related to the job "example" - Defaults to "none" (disabled). - -verbose Enable verbose output. ` @@ -212,6 +212,7 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ "-duration": complete.PredictAnything, + "-event-topic": complete.PredictAnything, "-event-index": complete.PredictAnything, "-interval": complete.PredictAnything, "-log-level": complete.PredictSet("TRACE", "DEBUG", "INFO", "WARN", "ERROR"), @@ -223,7 +224,6 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags { "-pprof-duration": complete.PredictAnything, "-consul-token": complete.PredictAnything, "-vault-token": complete.PredictAnything, - "-event-topic": complete.PredictAnything, "-verbose": complete.PredictAnything, }) } @@ -341,6 +341,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { flags.StringVar(&duration, "duration", "2m", "") flags.Int64Var(&eventIndex, "event-index", 0, "") + flags.StringVar(&eventTopic, "event-topic", "none", "") flags.StringVar(&interval, "interval", "30s", "") flags.StringVar(&c.logLevel, "log-level", "DEBUG", "") flags.IntVar(&c.maxNodes, "max-nodes", 10, "") @@ -350,7 +351,6 @@ func (c *OperatorDebugCommand) Run(args []string) int { flags.BoolVar(&allowStale, "stale", false, "") flags.StringVar(&output, "output", "", "") flags.StringVar(&pprofDuration, "pprof-duration", "1s", "") - flags.StringVar(&eventTopic, "event-topic", "none", "") flags.BoolVar(&c.verbose, "verbose", false, "") c.consul = &external{tls: &api.TLSConfig{}} From 329136a6c5af0e9f470f94af4daefefc363657d5 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 17 Jan 2022 20:57:37 -0500 Subject: [PATCH 11/13] automatically capitalize event stream topics when using the `nomad operator debug` command --- command/operator_debug.go | 22 +++++++++++++++------- command/operator_debug_test.go | 9 +++++++++ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index b2104d86fc7..04a549092e5 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -1489,15 +1489,23 @@ func parseEventTopics(topicList []string) (map[api.Topic][]string, error) { return topics, mErrs.ErrorOrNil() } -func parseTopic(topic string) (string, string, error) { - parts := strings.Split(topic, ":") - // infer wildcard if only given a topic - if len(parts) == 1 { - return topic, "*", nil - } else if len(parts) != 2 { +func parseTopic(input string) (string, string, error) { + var topic, filter string + + parts := strings.Split(input, ":") + switch len(parts) { + case 1: + // infer wildcard if only given a topic + topic = input + filter = "*" + case 2: + topic = parts[0] + filter = parts[1] + default: return "", "", fmt.Errorf("Invalid key value pair for topic: %s", topic) } - return parts[0], parts[1], nil + + return strings.Title(topic), filter, nil } func allTopics() map[api.Topic][]string { diff --git a/command/operator_debug_test.go b/command/operator_debug_test.go index 04b8053f779..caeb3f8142f 100644 --- a/command/operator_debug_test.go +++ b/command/operator_debug_test.go @@ -886,6 +886,15 @@ func TestDebug_EventStream_TopicsFromString(t *testing.T) { api.TopicNode: {"*"}, }, }, + { + name: "capitalize topics", + topicList: "evaluation:example,job:*,node:*", + want: map[api.Topic][]string{ + api.TopicEvaluation: {"example"}, + api.TopicJob: {"*"}, + api.TopicNode: {"*"}, + }, + }, { name: "all topics for filterKey", topicList: "*:example", From 8cc304b6175da7bc92a628f499358299ded46041 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 17 Jan 2022 21:09:12 -0500 Subject: [PATCH 12/13] minor fixes --- command/operator_debug.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index 04a549092e5..7cf04f7662f 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -77,7 +77,7 @@ Usage: nomad operator debug [options] 'list-jobs' capability for all namespaces. To collect pprof profiles the token will also require 'agent:write', or enable_debug configuration set to true. - + If event stream capture is enabled, the Job, Allocation, Deployment, and Evaluation topics require 'namespace:read-job' capabilities, the Node topic requires 'node:read'. A 'management' token is required to capture @@ -153,7 +153,7 @@ Debug Options: Specifies the index to start streaming events from. If the requested index is no longer in the buffer the stream will start at the next available index. Defaults to 0. - + -event-topic=: Enable event stream capture, filtered by comma delimited list of topic filters. Examples: @@ -212,8 +212,8 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ "-duration": complete.PredictAnything, - "-event-topic": complete.PredictAnything, "-event-index": complete.PredictAnything, + "-event-topic": complete.PredictAnything, "-interval": complete.PredictAnything, "-log-level": complete.PredictSet("TRACE", "DEBUG", "INFO", "WARN", "ERROR"), "-max-nodes": complete.PredictAnything, From d185255c168e9642f980cc73e634fbff4078adb3 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 17 Jan 2022 21:26:19 -0500 Subject: [PATCH 13/13] fix linter --- command/operator_debug.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index 7cf04f7662f..446ddc03eda 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -418,7 +418,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { // Validate and set initial event stream index if eventIndex < 0 { - c.Ui.Error(fmt.Sprintf("Event stream index must be greater than zero")) + c.Ui.Error("Event stream index must be greater than zero") return 1 } c.index = uint64(eventIndex)