From bf86880cd94fc20a26095316a61abb1a594c27e8 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Mon, 17 Jul 2023 10:02:04 -0400 Subject: [PATCH 1/4] roachprod: return structured data from `roachprod.Monitor` Previously, the `Monitor` function would return semi-structured data in the form of a `(node, message)` pair. The `message` field corresponded to the output (as in, stdout) of the bash script that implements the monitor logic. This tied callers to the specific implementation of the function and to the strings passed to `echo` in that script (e.g., all the checks for the `"dead"` string in the message). This commit updates the function to instead return an event channel. Events can be of different types corresponding to each of the events that the shell script is able to emit (e.g., when the cockroach process is found to be running, when it stops, etc). This makes the parsing of the script output private, and simplifies the logic in most callers. The message returned by the `roachtest` wrapper is also changed slightly in this commit to make it clearer where the error is coming from: the function passed to `monitor.Go` or the monitor process itself. This has been a source of confusion in the past. Finally, the monitor implementation in roachprod is changed slightly to avoid blocking on channel sends if the context has already been canceled. This avoids leaked goroutines in tests, as canceling the context passed to `Monitor()` should cause all goroutines to eventually finish instead of blocking indefinitely. Epic: CRDB-19321 Release note: None --- pkg/cmd/roachprod/main.go | 16 +-- pkg/cmd/roachtest/cluster.go | 12 +-- pkg/cmd/roachtest/monitor.go | 33 +++--- pkg/roachprod/install/cluster_synced.go | 137 +++++++++++++++++++----- pkg/roachprod/install/session.go | 1 - 5 files changed, 136 insertions(+), 63 deletions(-) diff --git a/pkg/cmd/roachprod/main.go b/pkg/cmd/roachprod/main.go index 07b754f5f47f..23b83af174d9 100644 --- a/pkg/cmd/roachprod/main.go +++ b/pkg/cmd/roachprod/main.go @@ -650,21 +650,15 @@ of nodes, outputting a line whenever a change is detected: `, Args: cobra.ExactArgs(1), Run: wrap(func(cmd *cobra.Command, args []string) error { - messages, err := roachprod.Monitor(context.Background(), config.Logger, args[0], monitorOpts) + eventChan, err := roachprod.Monitor(context.Background(), config.Logger, args[0], monitorOpts) if err != nil { return err } - for msg := range messages { - if msg.Err != nil { - msg.Msg += "error: " + msg.Err.Error() - } - thisError := errors.Newf("%d: %s", msg.Node, msg.Msg) - if msg.Err != nil || strings.Contains(msg.Msg, "dead") { - err = errors.CombineErrors(err, thisError) - } - fmt.Println(thisError.Error()) + for info := range eventChan { + fmt.Println(info.String()) } - return err + + return nil }), } diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index 87e70aa2e17c..43e9a27abb4e 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -1528,7 +1528,7 @@ func (c *clusterImpl) assertNoDeadNode(ctx context.Context, t test.Test) error { } t.L().Printf("checking for dead nodes") - ch, err := roachprod.Monitor(ctx, t.L(), c.name, install.MonitorOpts{OneShot: true, IgnoreEmptyNodes: true}) + eventsCh, err := roachprod.Monitor(ctx, t.L(), c.name, install.MonitorOpts{OneShot: true, IgnoreEmptyNodes: true}) // An error here means there was a problem initialising a SyncedCluster. if err != nil { @@ -1536,14 +1536,12 @@ func (c *clusterImpl) assertNoDeadNode(ctx context.Context, t test.Test) error { } deadNodes := 0 - for n := range ch { - // If there's an error, it means either that the monitor command failed - // completely, or that it found a dead node worth complaining about. - if n.Err != nil || strings.HasPrefix(n.Msg, "dead") { + for info := range eventsCh { + t.L().Printf("%s", info) + + if _, isDeath := info.Event.(install.MonitorNodeDead); isDeath { deadNodes++ } - - t.L().Printf("n%d: err=%v,msg=%s", n.Node, n.Err, n.Msg) } if deadNodes > 0 { diff --git a/pkg/cmd/roachtest/monitor.go b/pkg/cmd/roachtest/monitor.go index 297faac0ff22..3278207bc016 100644 --- a/pkg/cmd/roachtest/monitor.go +++ b/pkg/cmd/roachtest/monitor.go @@ -13,7 +13,6 @@ package main import ( "context" "fmt" - "strings" "sync" "sync/atomic" @@ -178,7 +177,7 @@ func (m *monitorImpl) wait() error { m.cancel() wg.Done() }() - setErr(errors.Wrap(m.g.Wait(), "monitor task failed")) + setErr(errors.Wrap(m.g.Wait(), "function passed to monitor.Go failed")) }() // 2. The second goroutine reads from the monitoring channel, watching for any @@ -190,28 +189,24 @@ func (m *monitorImpl) wait() error { wg.Done() }() - messagesChannel, err := roachprod.Monitor(m.ctx, m.l, m.nodes, install.MonitorOpts{}) + eventsCh, err := roachprod.Monitor(m.ctx, m.l, m.nodes, install.MonitorOpts{}) if err != nil { setErr(errors.Wrap(err, "monitor command failure")) return } - var monitorErr error - for msg := range messagesChannel { - if msg.Err != nil { - msg.Msg += "error: " + msg.Err.Error() - } - thisError := errors.Newf("%d: %s", msg.Node, msg.Msg) - if msg.Err != nil || strings.Contains(msg.Msg, "dead") { - monitorErr = errors.CombineErrors(monitorErr, thisError) + + for info := range eventsCh { + _, isDeath := info.Event.(install.MonitorNodeDead) + isExpectedDeath := isDeath && atomic.AddInt32(&m.expDeaths, -1) >= 0 + var expectedDeathStr string + if isExpectedDeath { + expectedDeathStr = ": expected" } - var id int - var s string - newMsg := thisError.Error() - if n, _ := fmt.Sscanf(newMsg, "%d: %s", &id, &s); n == 2 { - if strings.Contains(s, "dead") && atomic.AddInt32(&m.expDeaths, -1) < 0 { - setErr(errors.Wrap(fmt.Errorf("unexpected node event: %s", newMsg), "monitor command failure")) - return - } + m.l.Printf("Monitor event: %s%s", info, expectedDeathStr) + + if isDeath && !isExpectedDeath { + setErr(fmt.Errorf("unexpected node event: %s", info)) + return } } }() diff --git a/pkg/roachprod/install/cluster_synced.go b/pkg/roachprod/install/cluster_synced.go index c35291ce9743..734306ea14cf 100644 --- a/pkg/roachprod/install/cluster_synced.go +++ b/pkg/roachprod/install/cluster_synced.go @@ -583,20 +583,52 @@ fi return results, nil } +// MonitorNodeSkipped represents a node whose status was not checked. +type MonitorNodeSkipped struct{} + +// MonitorNodeRunning represents the cockroach process running on a +// node. +type MonitorNodeRunning struct { + PID string +} + +// MonitorNodeDead represents the cockroach process dying on a node. +type MonitorNodeDead struct { + ExitCode string +} + +type MonitorError struct { + Err error +} + // NodeMonitorInfo is a message describing a cockroach process' status. type NodeMonitorInfo struct { // The index of the node (in a SyncedCluster) at which the message originated. Node Node - // A message about the node. This is either a PID, "dead", "nc exited", or - // "skipped". - // Anything but a PID or "skipped" is an indication that there is some - // problem with the node and that the process is not running. - Msg string - // Err is an error that may occur when trying to probe the status of the node. - // If Err is non-nil, Msg is empty. After an error is returned, the node with - // the given index will no longer be probed. Errors typically indicate networking - // issues or nodes that have (physically) shut down. - Err error + // Event describes what happened to the node; it is one of + // MonitorNodeSkipped (no store directory was found); + // MonitorNodeRunning, sent when cockroach is running on a node; + // MonitorNodeDead, when the cockroach process stops running on a + // node; or MonitorError, typically indicate networking issues + // or nodes that have (physically) shut down. + Event interface{} +} + +func (nmi NodeMonitorInfo) String() string { + var status string + + switch event := nmi.Event.(type) { + case MonitorNodeRunning: + status = fmt.Sprintf("cockroach process is running (PID: %s)", event.PID) + case MonitorNodeSkipped: + status = "node skipped" + case MonitorNodeDead: + status = fmt.Sprintf("cockroach process died (exit code %s)", event.ExitCode) + case MonitorError: + status = fmt.Sprintf("error: %s", event.Err.Error()) + } + + return fmt.Sprintf("n%d: %s", nmi.Node, status) } // MonitorOpts is used to pass the options needed by Monitor. @@ -606,16 +638,16 @@ type MonitorOpts struct { } // Monitor writes NodeMonitorInfo for the cluster nodes to the returned channel. -// Infos sent to the channel always have the Index and exactly one of Msg or Err -// set. +// Infos sent to the channel always have the Node the event refers to, and the +// event itself. See documentation for NodeMonitorInfo for possible event types. // -// If oneShot is true, infos are retrieved only once for each node and the +// If OneShot is true, infos are retrieved only once for each node and the // channel is subsequently closed; otherwise the process continues indefinitely // (emitting new information as the status of the cockroach process changes). // -// If ignoreEmptyNodes is true, nodes on which no CockroachDB data is found -// (in {store-dir}) will not be probed and single message, "skipped", will -// be emitted for them. +// If IgnoreEmptyNodes is true, nodes on which no CockroachDB data is found +// (in {store-dir}) will not be probed and single event, MonitorNodeSkipped, +// will be emitted for them. func (c *SyncedCluster) Monitor( l *logger.Logger, ctx context.Context, opts MonitorOpts, ) chan NodeMonitorInfo { @@ -624,10 +656,30 @@ func (c *SyncedCluster) Monitor( var wg sync.WaitGroup monitorCtx, cancel := context.WithCancel(ctx) + // sendEvent sends the NodeMonitorInfo passed through the channel + // that is listened to by the caller. Bails if the context is + // canceled. + sendEvent := func(info NodeMonitorInfo) { + select { + case ch <- info: + // We were able to send the info through the channel. + case <-monitorCtx.Done(): + // Don't block trying to send the info. + } + } + + const ( + separator = "|" + skippedMsg = "skipped" + runningMsg = "running" + deadMsg = "dead" + ) + for i := range nodes { wg.Add(1) go func(i int) { defer wg.Done() + node := nodes[i] // On each monitored node, we loop looking for a cockroach process. @@ -637,18 +689,30 @@ func (c *SyncedCluster) Monitor( Store string Port int Local bool + Separator string + SkippedMsg string + RunningMsg string + DeadMsg string }{ OneShot: opts.OneShot, IgnoreEmpty: opts.IgnoreEmptyNodes, Store: c.NodeDir(node, 1 /* storeIndex */), Port: c.NodePort(node), Local: c.IsLocal(), + Separator: separator, + SkippedMsg: skippedMsg, + RunningMsg: runningMsg, + DeadMsg: deadMsg, } + // NB.: we parse the output of every line this script + // prints. Every call to `echo` must match the parsing logic + // down below in order to produce structured results to the + // caller. snippet := ` {{ if .IgnoreEmpty }} if ! ls {{.Store}}/marker.* 1> /dev/null 2>&1; then - echo "skipped" + echo "{{.SkippedMsg}}" exit 0 fi {{- end}} @@ -682,10 +746,10 @@ while :; do # the new incarnation. We lost the actual exit status of the old PID. status="unknown" fi - echo "dead (exit status ${status})" + echo "{{.DeadMsg}}{{.Separator}}${status}" fi if [ "${pid}" != 0 ]; then - echo "${pid}" + echo "{{.RunningMsg}}{{.Separator}}${pid}" fi lastpid=${pid} fi @@ -704,7 +768,8 @@ done t := template.Must(template.New("script").Parse(snippet)) var buf bytes.Buffer if err := t.Execute(&buf, data); err != nil { - ch <- NodeMonitorInfo{Node: node, Err: err} + err := errors.Wrap(err, "failed to execute template") + sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) return } @@ -713,14 +778,16 @@ done p, err := sess.StdoutPipe() if err != nil { - ch <- NodeMonitorInfo{Node: node, Err: err} + err := errors.Wrap(err, "failed to read stdout pipe") + sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) wg.Done() return } // Request a PTY so that the script will receive a SIGPIPE when the // session is closed. if err := sess.RequestPty(); err != nil { - ch <- NodeMonitorInfo{Node: node, Err: err} + err := errors.Wrap(err, "failed to request PTY") + sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) return } @@ -734,12 +801,31 @@ done if err == io.EOF { return } - ch <- NodeMonitorInfo{Node: node, Msg: string(line)} + if err != nil { + err := errors.Wrap(err, "error reading from session") + sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) + } + + parts := strings.Split(string(line), separator) + switch parts[0] { + case skippedMsg: + sendEvent(NodeMonitorInfo{Node: node, Event: MonitorNodeSkipped{}}) + case runningMsg: + pid := parts[1] + sendEvent(NodeMonitorInfo{Node: node, Event: MonitorNodeRunning{pid}}) + case deadMsg: + exitCode := parts[1] + sendEvent(NodeMonitorInfo{Node: node, Event: MonitorNodeDead{exitCode}}) + default: + err := fmt.Errorf("internal error: unrecognized output from monitor: %s", line) + sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) + } } }(p) if err := sess.Start(); err != nil { - ch <- NodeMonitorInfo{Node: node, Err: err} + err := errors.Wrap(err, "failed to start session") + sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) return } @@ -755,7 +841,8 @@ done // pipe. Otherwise it can be closed under us, causing the reader to loop // infinitely receiving a non-`io.EOF` error. if err := sess.Wait(); err != nil { - ch <- NodeMonitorInfo{Node: node, Err: err} + err := errors.Wrap(err, "failed to wait for session") + sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) return } }(i) diff --git a/pkg/roachprod/install/session.go b/pkg/roachprod/install/session.go index 0dfd72b31dcf..60dc9d1d6cc0 100644 --- a/pkg/roachprod/install/session.go +++ b/pkg/roachprod/install/session.go @@ -112,7 +112,6 @@ func newRemoteSession(l *logger.Logger, command *remoteCommand) *remoteSession { } } - //const logfile = "" args := []string{ command.user + "@" + command.host, From f75ebe72e21b9c716dedac66e0713b4114419706 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Mon, 24 Jul 2023 15:19:38 -0400 Subject: [PATCH 2/4] roachtest: monitor nodes in mixedversion This updates the `mixedversion` runner to monitor the status of the cockroach processes on the cluster during a mixed-version test. This allows the test to fail immediately if a node crashes unexpectedly. Previously, if a node crashed, the test would only fail if the crash resulted in an assertion failure. There is a chance that this would not happen because the framework could decide to restart the node shortly after its crash. With this change, an unexpected node death causes the test to fail immediately. This commit also exposes the ability for test authors to use `ExpectDeath()` in their tests, in case the test performs its own restarts or chaos events. Epic: CRDB-19321 Release note: None --- pkg/cmd/roachtest/monitor.go | 34 ++- .../mixedversion/mixedversion.go | 1 + .../roachtestutil/mixedversion/runner.go | 208 ++++++++++++++---- .../roachtestutil/mixedversion/runner_test.go | 2 +- .../roachtest/tests/mixed_version_backup.go | 5 +- 5 files changed, 198 insertions(+), 52 deletions(-) diff --git a/pkg/cmd/roachtest/monitor.go b/pkg/cmd/roachtest/monitor.go index 3278207bc016..908e9fd3b865 100644 --- a/pkg/cmd/roachtest/monitor.go +++ b/pkg/cmd/roachtest/monitor.go @@ -31,11 +31,13 @@ type monitorImpl struct { Failed() bool WorkerStatus(...interface{}) } - l *logger.Logger - nodes string - ctx context.Context - cancel func() - g *errgroup.Group + l *logger.Logger + nodes string + ctx context.Context + cancel func() + g *errgroup.Group + + numTasks int32 // atomically expDeaths int32 // atomically } @@ -79,6 +81,8 @@ func (m *monitorImpl) ResetDeaths() { var errTestFatal = errors.New("t.Fatal() was called") func (m *monitorImpl) Go(fn func(context.Context) error) { + atomic.AddInt32(&m.numTasks, 1) + m.g.Go(func() (err error) { defer func() { r := recover() @@ -170,15 +174,21 @@ func (m *monitorImpl) wait() error { } // 1. The first goroutine waits for the worker errgroup to exit. + // Note that this only happens if the caller created at least one + // task for the monitor. This check enables the roachtest monitor to + // be used in cases where we just want to monitor events in the + // cluster without running any background tasks through the monitor. var wg sync.WaitGroup - wg.Add(1) - go func() { - defer func() { - m.cancel() - wg.Done() + if atomic.LoadInt32(&m.numTasks) > 0 { + wg.Add(1) + go func() { + defer func() { + m.cancel() + wg.Done() + }() + setErr(errors.Wrap(m.g.Wait(), "function passed to monitor.Go failed")) }() - setErr(errors.Wrap(m.g.Wait(), "function passed to monitor.Go failed")) - }() + } // 2. The second goroutine reads from the monitoring channel, watching for any // unexpected death events. diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go index 7caa754eed21..f0e349468623 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go @@ -653,6 +653,7 @@ func (s restartWithNewBinaryStep) Description() string { func (s restartWithNewBinaryStep) Run( ctx context.Context, l *logger.Logger, c cluster.Cluster, h *Helper, ) error { + h.ExpectDeath() return clusterupgrade.RestartNodesWithNewBinary( ctx, s.rt, diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go index 693dbb838df0..9a70a1f1578d 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go @@ -22,6 +22,7 @@ import ( "runtime/debug" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -61,6 +62,7 @@ type ( group ctxgroup.Group ctx context.Context events chan backgroundEvent + logger *logger.Logger stopFuncs []StopFunc } @@ -81,6 +83,19 @@ type ( clusterVersionsAfter []roachpb.Version } + // crdbMonitor is a thin wrapper around the roachtest monitor API + // (cluster.NewMonitor) that produces error events through a channel + // whenever an unexpected node death happens. It also allows us to + // provide an API for test authors to inform the framework that a + // node death is expected if the test performs its own restarts or + // chaos events. + crdbMonitor struct { + once sync.Once + crdbNodes option.NodeListOption + monitor cluster.Monitor + errCh chan error + } + testRunner struct { ctx context.Context cancel context.CancelFunc @@ -94,6 +109,7 @@ type ( clusterVersions atomic.Value background *backgroundRunner + monitor *crdbMonitor connCache struct { mu syncutil.Mutex @@ -123,24 +139,18 @@ func newTestRunner( logger: l, cluster: c, crdbNodes: crdbNodes, - background: newBackgroundRunner(ctx), + background: newBackgroundRunner(ctx, l), + monitor: newCRDBMonitor(ctx, c, crdbNodes), seed: randomSeed, } } // run implements the test running logic, which boils down to running // each step in sequence. -func (tr *testRunner) run() error { - defer tr.closeConnections() - defer func() { - // Stop background functions explicitly so that the corresponding - // termination is marked `TriggeredByTest` (not necessary for - // correctness, just for clarity). - tr.logger.Printf("stopping background functions") - tr.background.Terminate() - }() - +func (tr *testRunner) run() (retErr error) { stepsErr := make(chan error) + defer func() { tr.teardown(stepsErr, retErr != nil) }() + go func() { defer close(stepsErr) for _, step := range tr.plan.steps { @@ -165,6 +175,9 @@ func (tr *testRunner) run() error { } return fmt.Errorf("background step `%s` returned error: %w", event.Name, event.Err) + + case err := <-tr.monitor.Err(): + return tr.testFailure(err.Error(), tr.logger) } } } @@ -186,6 +199,7 @@ func (tr *testRunner) runStep(ctx context.Context, step testStep) error { if err := tr.refreshClusterVersions(); err != nil { return err } + tr.monitor.Init() } } @@ -242,16 +256,10 @@ func (tr *testRunner) runSingleStep(ctx context.Context, ss singleStep, l *logge tr.logStep(prefix, ss, l) }() - if err := func() (retErr error) { - defer func() { - if r := recover(); r != nil { - l.Printf("panic stack trace:\n%s", string(debug.Stack())) - retErr = fmt.Errorf("panic (stack trace above): %v", r) - } - }() + if err := panicAsError(l, func() error { return ss.Run(ctx, l, tr.cluster, tr.newHelper(ctx, l)) - }(); err != nil { - if isContextCanceled(err) { + }); err != nil { + if isContextCanceled(ctx) { l.Printf("step terminated (context canceled)") // Avoid creating a `stepError` (which involves querying binary // and cluster versions) when the context was canceled as the @@ -335,6 +343,36 @@ func (tr *testRunner) testFailure(desc string, l *logger.Logger) error { return tf } +// teardown groups together all tasks that happen once a test finishes. +func (tr *testRunner) teardown(stepsChan chan error, testFailed bool) { + if testFailed { + tr.logger.Printf("mixed-version test FAILED") + } else { + tr.logger.Printf("mixed-version test PASSED") + } + + tr.cancel() + + // Stop background functions explicitly so that the corresponding + // termination is marked `TriggeredByTest` (not necessary for + // correctness, just for clarity). + tr.logger.Printf("stopping background functions") + tr.background.Terminate() + + // If the test failed, we wait for any currently running steps to + // return before passing control back to the roachtest + // framework. This achieves a test.log that does not contain any + // test step output once roachtest started to collect failure + // artifacts, which would be confusing. + if testFailed { + tr.logger.Printf("waiting for all steps to finish after context cancelation") + waitForChannel(stepsChan, "test steps", tr.logger) + } + + tr.logger.Printf("closing database connections") + tr.closeConnections() +} + func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) { dashes := strings.Repeat("-", 10) l.Printf("%[1]s %s (%d): %s %[1]s", dashes, prefix, step.ID(), step.Description()) @@ -462,11 +500,44 @@ func (tr *testRunner) closeConnections() { } } -func newBackgroundRunner(ctx context.Context) *backgroundRunner { +func newCRDBMonitor( + ctx context.Context, c cluster.Cluster, crdbNodes option.NodeListOption, +) *crdbMonitor { + return &crdbMonitor{ + crdbNodes: crdbNodes, + monitor: c.NewMonitor(ctx, crdbNodes), + errCh: make(chan error), + } +} + +// Init must be called once the cluster is initialized and the +// cockroach process is running on the nodes. Init is idempotent. +func (cm *crdbMonitor) Init() { + cm.once.Do(func() { + go func() { + if err := cm.monitor.WaitE(); err != nil { + cm.errCh <- err + } + }() + }) +} + +// Err returns a channel that will receive errors whenever an +// unexpected node death is observed. +func (cm *crdbMonitor) Err() chan error { + return cm.errCh +} + +func (cm *crdbMonitor) ExpectDeaths(n int) { + cm.monitor.ExpectDeaths(int32(n)) +} + +func newBackgroundRunner(ctx context.Context, l *logger.Logger) *backgroundRunner { g := ctxgroup.WithContext(ctx) return &backgroundRunner{ group: g, ctx: ctx, + logger: l, events: make(chan backgroundEvent), } } @@ -481,11 +552,20 @@ func (br *backgroundRunner) Start(name string, fn func(context.Context) error) c var expectedContextCancelation bool br.group.Go(func() error { err := fn(bgCtx) - br.events <- backgroundEvent{ + event := backgroundEvent{ Name: name, Err: err, - TriggeredByTest: err != nil && isContextCanceled(err) && expectedContextCancelation, + TriggeredByTest: err != nil && isContextCanceled(bgCtx) && expectedContextCancelation, } + + select { + case br.events <- event: + // exit goroutine + case <-br.ctx.Done(): + // Test already finished, exit goroutine. + return nil + } + return err }) @@ -503,11 +583,20 @@ func (br *backgroundRunner) Start(name string, fn func(context.Context) error) c // started during the test. This includes background functions created // during test runtime (using `helper.Background()`), as well as // background steps declared in the test setup (using -// `BackgroundFunc`, `Workload`, et al). +// `BackgroundFunc`, `Workload`, et al). Returns when all background +// functions have returned. func (br *backgroundRunner) Terminate() { for _, stop := range br.stopFuncs { stop() } + + doneCh := make(chan error) + go func() { + defer close(doneCh) + _ = br.group.Wait() + }() + + waitForChannel(doneCh, "background functions", br.logger) } func (br *backgroundRunner) CompletedEvents() <-chan backgroundEvent { @@ -571,9 +660,9 @@ func (h *Helper) Background( return fmt.Errorf("failed to create logger for background function %q: %w", name, err) } - err = fn(ctx, bgLogger) + err = panicAsError(bgLogger, func() error { return fn(ctx, bgLogger) }) if err != nil { - if isContextCanceled(err) { + if isContextCanceled(ctx) { return err } @@ -596,6 +685,20 @@ func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) cont }) } +// ExpectDeath alerts the testing infrastructure that a node is +// expected to die. Regular restarts as part of the mixedversion +// testing are already taken into account. This function should only +// be used by tests that perform their own node restarts or chaos +// events. +func (h *Helper) ExpectDeath() { + h.ExpectDeaths(1) +} + +// ExpectDeaths is the general version of `ExpectDeath()`. +func (h *Helper) ExpectDeaths(n int) { + h.runner.monitor.ExpectDeaths(n) +} + // loggerFor creates a logger instance to be used by background // functions (created by calling `Background` on the helper // instance). It is similar to the logger instances created for @@ -654,6 +757,39 @@ func loadAtomicVersions(v atomic.Value) []roachpb.Version { return v.Load().([]roachpb.Version) } +// panicAsError ensures that the any panics that might happen while +// the function passed runs are captured and returned as regular +// errors. A stack trace is included in the logs when that happens to +// facilitate debugging. +func panicAsError(l *logger.Logger, f func() error) (retErr error) { + defer func() { + if r := recover(); r != nil { + l.Printf("panic stack trace:\n%s", string(debug.Stack())) + retErr = fmt.Errorf("panic (stack trace above): %v", r) + } + }() + return f() +} + +// waitForChannel waits for the given channel `ch` to close; returns +// when that happens. If the channel does not close within 5 minutes, +// the function logs a message and returns. +// +// The main use-case for this function is waiting for user-provided +// hooks to return after the context passed to them is canceled. We +// want to allow some time for them to finish, but we also don't want +// to block indefinitely if a function inadvertently ignores context +// cancelation. +func waitForChannel(ch chan error, desc string, l *logger.Logger) { + maxWait := 5 * time.Minute + select { + case <-ch: + // return + case <-time.After(maxWait): + l.Printf("waited for %s for %s to finish, giving up", maxWait, desc) + } +} + func formatVersions(versions []roachpb.Version) string { var pairs []string for idx, version := range versions { @@ -663,15 +799,13 @@ func formatVersions(versions []roachpb.Version) string { return fmt.Sprintf("[%s]", strings.Join(pairs, ", ")) } -// isContextCanceled returns a boolean indicating whether the error -// given happened because some context was canceled. -func isContextCanceled(err error) bool { - // TODO(renato): unfortunately, we have to resort to string - // comparison here. The most common use case for this function is - // detecting cluster commands that fail when the test context is - // canceled (after test success or failure), and roachtest does not - // return an error that wraps the context cancelation (in other - // words, `errors.Is` doesn't work). Once we fix this behavior, we - // should use structured errors here. - return strings.Contains(err.Error(), context.Canceled.Error()) +// isContextCanceled returns a boolean indicating whether the context +// passed is canceled. +func isContextCanceled(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } } diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner_test.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner_test.go index 62662f30c359..67ee56977d88 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner_test.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner_test.go @@ -61,7 +61,7 @@ func testTestRunner() *testRunner { cancel: cancel, logger: nilLogger, crdbNodes: nodes, - background: newBackgroundRunner(runnerCtx), + background: newBackgroundRunner(runnerCtx, nilLogger), seed: seed, } } diff --git a/pkg/cmd/roachtest/tests/mixed_version_backup.go b/pkg/cmd/roachtest/tests/mixed_version_backup.go index 675d33ba1aa5..5e368d2c11fe 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_backup.go +++ b/pkg/cmd/roachtest/tests/mixed_version_backup.go @@ -1906,7 +1906,7 @@ func (mvb *mixedVersionBackup) verifyBackupCollection( // as the tables we backed up. In addition, we need to wipe the // cluster before attempting a restore. restoredTables = collection.tables - if err := mvb.resetCluster(ctx, l, version); err != nil { + if err := mvb.resetCluster(ctx, l, h, version); err != nil { return err } case *tableBackup: @@ -1972,9 +1972,10 @@ func (mvb *mixedVersionBackup) verifyBackupCollection( // specified version binary. This is done before we attempt restoring a // full cluster backup. func (mvb *mixedVersionBackup) resetCluster( - ctx context.Context, l *logger.Logger, version string, + ctx context.Context, l *logger.Logger, h *mixedversion.Helper, version string, ) error { l.Printf("resetting cluster using version %q", clusterupgrade.VersionMsg(version)) + h.ExpectDeaths(len(mvb.roachNodes)) if err := mvb.cluster.WipeE(ctx, l, true /* preserveCerts */, mvb.roachNodes); err != nil { return fmt.Errorf("failed to wipe cluster: %w", err) } From 7537b9230f7e85fe1fddb3369af51305c62681c8 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Tue, 25 Jul 2023 11:03:43 -0400 Subject: [PATCH 3/4] roachtest/mixedversion: move helper functions to its own file This makes it easier for test authors to see all helper functions available without having to browse through internal framework code. Epic: CRDB-19321 Release note: None --- .../roachtestutil/mixedversion/BUILD.bazel | 1 + .../roachtestutil/mixedversion/helper.go | 134 ++++++++++++++++++ .../roachtestutil/mixedversion/runner.go | 111 --------------- 3 files changed, 135 insertions(+), 111 deletions(-) create mode 100644 pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel index 6a308310701c..d8108eb91d90 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "mixedversion", srcs = [ + "helper.go", "mixedversion.go", "planner.go", "runner.go", diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go new file mode 100644 index 000000000000..38e2c4990742 --- /dev/null +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go @@ -0,0 +1,134 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package mixedversion + +import ( + "context" + gosql "database/sql" + "fmt" + "math/rand" + "path" + "strings" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" +) + +func (h *Helper) RandomNode(prng *rand.Rand, nodes option.NodeListOption) int { + return nodes[prng.Intn(len(nodes))] +} + +// RandomDB returns a (nodeID, connection) tuple for a randomly picked +// cockroach node according to the parameters passed. +func (h *Helper) RandomDB(prng *rand.Rand, nodes option.NodeListOption) (int, *gosql.DB) { + node := h.RandomNode(prng, nodes) + return node, h.Connect(node) +} + +// QueryRow performs `db.QueryRowContext` on a randomly picked +// database node. The query and the node picked are logged in the logs +// of the step that calls this function. +func (h *Helper) QueryRow(rng *rand.Rand, query string, args ...interface{}) *gosql.Row { + node, db := h.RandomDB(rng, h.runner.crdbNodes) + h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) + return db.QueryRowContext(h.ctx, query, args...) +} + +// Exec performs `db.ExecContext` on a randomly picked database node. +// The query and the node picked are logged in the logs of the step +// that calls this function. +func (h *Helper) Exec(rng *rand.Rand, query string, args ...interface{}) error { + node, db := h.RandomDB(rng, h.runner.crdbNodes) + h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) + _, err := db.ExecContext(h.ctx, query, args...) + return err +} + +func (h *Helper) Connect(node int) *gosql.DB { + return h.runner.conn(node) +} + +// SetContext should be called by steps that need access to the test +// context, as that is only visible to them. +func (h *Helper) SetContext(c *Context) { + h.testContext = c +} + +// Context returns the test context associated with a certain step. It +// is made available for user-functions (see runHookStep). +func (h *Helper) Context() *Context { + return h.testContext +} + +// Background allows test authors to create functions that run in the +// background in mixed-version hooks. +func (h *Helper) Background( + name string, fn func(context.Context, *logger.Logger) error, +) context.CancelFunc { + return h.runner.background.Start(name, func(ctx context.Context) error { + bgLogger, err := h.loggerFor(name) + if err != nil { + return fmt.Errorf("failed to create logger for background function %q: %w", name, err) + } + + err = panicAsError(bgLogger, func() error { return fn(ctx, bgLogger) }) + if err != nil { + if isContextCanceled(ctx) { + return err + } + + desc := fmt.Sprintf("error in background function %s: %s", name, err) + return h.runner.testFailure(desc, bgLogger) + } + + return nil + }) +} + +// BackgroundCommand has the same semantics of `Background()`; the +// command passed will run and the test will fail if the command is +// not successful. +func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) context.CancelFunc { + desc := fmt.Sprintf("run command: %q", cmd) + return h.Background(desc, func(ctx context.Context, l *logger.Logger) error { + l.Printf("running command `%s` on nodes %v in the background", cmd, nodes) + return h.runner.cluster.RunE(ctx, nodes, cmd) + }) +} + +// ExpectDeath alerts the testing infrastructure that a node is +// expected to die. Regular restarts as part of the mixedversion +// testing are already taken into account. This function should only +// be used by tests that perform their own node restarts or chaos +// events. +func (h *Helper) ExpectDeath() { + h.ExpectDeaths(1) +} + +// ExpectDeaths is the general version of `ExpectDeath()`. +func (h *Helper) ExpectDeaths(n int) { + h.runner.monitor.ExpectDeaths(n) +} + +// loggerFor creates a logger instance to be used by background +// functions (created by calling `Background` on the helper +// instance). It is similar to the logger instances created for +// mixed-version steps, but with the `background_` prefix. +func (h *Helper) loggerFor(name string) (*logger.Logger, error) { + atomic.AddInt64(&h.bgCount, 1) + + fileName := invalidChars.ReplaceAllString(strings.ToLower(name), "") + fileName = fmt.Sprintf("background_%s_%d", fileName, h.bgCount) + fileName = path.Join(logPrefix, fileName) + + return prefixedLogger(h.runner.logger, fileName) +} diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go index 9a70a1f1578d..2b9d00aa04e6 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go @@ -14,7 +14,6 @@ import ( "context" gosql "database/sql" "fmt" - "math/rand" "os" "path" "path/filepath" @@ -603,116 +602,6 @@ func (br *backgroundRunner) CompletedEvents() <-chan backgroundEvent { return br.events } -func (h *Helper) RandomNode(prng *rand.Rand, nodes option.NodeListOption) int { - return nodes[prng.Intn(len(nodes))] -} - -// RandomDB returns a (nodeID, connection) tuple for a randomly picked -// cockroach node according to the parameters passed. -func (h *Helper) RandomDB(prng *rand.Rand, nodes option.NodeListOption) (int, *gosql.DB) { - node := h.RandomNode(prng, nodes) - return node, h.Connect(node) -} - -// QueryRow performs `db.QueryRowContext` on a randomly picked -// database node. The query and the node picked are logged in the logs -// of the step that calls this function. -func (h *Helper) QueryRow(rng *rand.Rand, query string, args ...interface{}) *gosql.Row { - node, db := h.RandomDB(rng, h.runner.crdbNodes) - h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) - return db.QueryRowContext(h.ctx, query, args...) -} - -// Exec performs `db.ExecContext` on a randomly picked database node. -// The query and the node picked are logged in the logs of the step -// that calls this function. -func (h *Helper) Exec(rng *rand.Rand, query string, args ...interface{}) error { - node, db := h.RandomDB(rng, h.runner.crdbNodes) - h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) - _, err := db.ExecContext(h.ctx, query, args...) - return err -} - -func (h *Helper) Connect(node int) *gosql.DB { - return h.runner.conn(node) -} - -// SetContext should be called by steps that need access to the test -// context, as that is only visible to them. -func (h *Helper) SetContext(c *Context) { - h.testContext = c -} - -// Context returns the test context associated with a certain step. It -// is made available for user-functions (see runHookStep). -func (h *Helper) Context() *Context { - return h.testContext -} - -// Background allows test authors to create functions that run in the -// background in mixed-version hooks. -func (h *Helper) Background( - name string, fn func(context.Context, *logger.Logger) error, -) context.CancelFunc { - return h.runner.background.Start(name, func(ctx context.Context) error { - bgLogger, err := h.loggerFor(name) - if err != nil { - return fmt.Errorf("failed to create logger for background function %q: %w", name, err) - } - - err = panicAsError(bgLogger, func() error { return fn(ctx, bgLogger) }) - if err != nil { - if isContextCanceled(ctx) { - return err - } - - desc := fmt.Sprintf("error in background function %s: %s", name, err) - return h.runner.testFailure(desc, bgLogger) - } - - return nil - }) -} - -// BackgroundCommand has the same semantics of `Background()`; the -// command passed will run and the test will fail if the command is -// not successful. -func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) context.CancelFunc { - desc := fmt.Sprintf("run command: %q", cmd) - return h.Background(desc, func(ctx context.Context, l *logger.Logger) error { - l.Printf("running command `%s` on nodes %v in the background", cmd, nodes) - return h.runner.cluster.RunE(ctx, nodes, cmd) - }) -} - -// ExpectDeath alerts the testing infrastructure that a node is -// expected to die. Regular restarts as part of the mixedversion -// testing are already taken into account. This function should only -// be used by tests that perform their own node restarts or chaos -// events. -func (h *Helper) ExpectDeath() { - h.ExpectDeaths(1) -} - -// ExpectDeaths is the general version of `ExpectDeath()`. -func (h *Helper) ExpectDeaths(n int) { - h.runner.monitor.ExpectDeaths(n) -} - -// loggerFor creates a logger instance to be used by background -// functions (created by calling `Background` on the helper -// instance). It is similar to the logger instances created for -// mixed-version steps, but with the `background_` prefix. -func (h *Helper) loggerFor(name string) (*logger.Logger, error) { - atomic.AddInt64(&h.bgCount, 1) - - fileName := invalidChars.ReplaceAllString(strings.ToLower(name), "") - fileName = fmt.Sprintf("background_%s_%d", fileName, h.bgCount) - fileName = path.Join(logPrefix, fileName) - - return prefixedLogger(h.runner.logger, fileName) -} - func (tf *testFailure) Error() string { if tf.summarized { return tf.description From 3609615ace5f7718cd77bcf474a0be12fa85db9d Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Tue, 25 Jul 2023 11:22:31 -0400 Subject: [PATCH 4/4] roachtest/mixedversion: update README with note about helpers The note about background functions is important to highlight specifically due to potential habit of using existing roachtest APIs. Epic: CRDB-19321 Release note: None --- .../roachtest/roachtestutil/mixedversion/README.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/README.md b/pkg/cmd/roachtest/roachtestutil/mixedversion/README.md index 2b7564a9fae1..ae5284ebdf65 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/README.md +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/README.md @@ -255,6 +255,18 @@ for { } ``` +#### Use test helpers + +Every test hook receives as parameter a `*mixedversion.Helper` instance. This helper struct contains convenience functions that make it easy to perform certain operations while automatically implementing some of the best practices described here. For a full list of helpers, check out the [`helper.go`](https://github.com/cockroachdb/cockroach/blob/master/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go) file in the framework's source tree. + +##### Aside: background functions + +One particularly important helper functionality to highlight relates to the management of functions that need to run in the background during a test. Typically, roachtests are expected to use a [monitor](https://github.com/cockroachdb/cockroach/blob/master/pkg/cmd/roachtest/monitor.go) for that purpose; not only does the monitor protect from panics inadvertently crashing the test, it also preempts their execution (via context cancelation) if a node dies unexpectedly. + +However, if a mixedversion test needs to perform a task in the background, they **must not use the roachtest monitor**. The reason for this is that mixedversion tests are [randomized](#embrace-randomness); as such, user-created monitors would not be able to predict when a death is expected since it does not know, by design, when a node restarts. + +To run functions in the background, use the API provided by the mixedversion framework. Long running tasks that run throughout the test can be defined with `(*mixedversion.Test).BackgroundFunc`. If a test hook needs to perform a task in the background, the `*mixedversion.Helper` instance has a `Background` function that can be used for that purpose. As usual, check the documentation for the public API for more details on usage and behaviour of these functions. + #### Log progress Logging events in the test as it runs can make debugging failures a lot easier. All hooks passed to the `mixedversion` API receive a `*logger.Logger` instance as parameter. **Functions should use that logger instead of the test logger** (`t.L()`). Doing so has two main advantages: @@ -398,6 +410,6 @@ $ COCKROACH_RANDOM_SEED=7357315251706229449 roachtest run --versions-binary-over ### Final Notes -* This is a high level document and does not include API documentation. The `mixedversion` package includes a lot of documentation in the form of source code comments, and that should be the source of truth when it comes to finding out what functionality is available and how to use it. +* This is a high level document and does not include API documentation. The `mixedversion` package includes a lot of documentation in the form of source code comments, and that should be the source of truth when it comes to finding out what functionality is available and how to use it. Most of the public API is in the [`mixedversion.go`](https://github.com/cockroachdb/cockroach/blob/master/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go) and [`helper.go`](https://github.com/cockroachdb/cockroach/blob/master/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go) files. * For a simple application of the `mixedversion` framework, check out the `acceptance/version-upgrade` roachtest. For a more complex example, see `backup-restore/mixed-version`. * For any other questions, please reach out to `#test-eng`!