diff --git a/pkg/cmd/roachtest/test.go b/pkg/cmd/roachtest/test.go index 4511f9cb9332..bbda480edf3f 100644 --- a/pkg/cmd/roachtest/test.go +++ b/pkg/cmd/roachtest/test.go @@ -32,11 +32,13 @@ import ( "sync" "time" + version "github.com/hashicorp/go-version" + "github.com/petermattis/goid" + "github.com/cockroachdb/cockroach/pkg/cmd/internal/issues" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - version "github.com/hashicorp/go-version" - "github.com/petermattis/goid" ) var ( @@ -620,6 +622,153 @@ func (t *test) IsBuildVersion(minVersion string) bool { return !t.registry.buildVersion.LessThan(vers) } +// runTests decides the initial resource allocation for each top-level tests +// (i.e. how many clusters it gets) and the delegates to runTopLevelTest. +func (r *registry) runTests(ctx context.Context, filter []string, count, parallelism int) { + // !!! deal with local + + filterRE := makeFilterRE(filter) + topLevelTests := r.List(filterRE) + n := len(topLevelTests) + if parallelism > n*count { + parallelism = n * count + } + sem := make(chan struct{}, parallelism) + clustersPerTLT := parallelism / n + wg := sync.WaitGroup{} + wg.Add(n) + for i, spec := range topLevelTests { + numClusters := clustersPerTLT + if i < parallelism%n { + numClusters++ + } + // Acquire slots from sem, to serve as the initial cluster pool to be used + // by used by this test. Actual cluster creation and destruction happens in + // runTopLevelTest(), as top-level tests determine a cluster's spec. + for j := 0; j < numClusters; j++ { + sem <- struct{}{} + } + go r.runTopLevelTest(ctx, *spec, filterRE, count, sem, numClusters, &wg) + } + // Wait for all tests to finish. + wg.Wait() +} + +type clusterFuture chan *cluster + +func makeClusterFuture() clusterFuture { + return clusterFuture(make(chan *cluster, 1)) +} + +func (f clusterFuture) Put(c *cluster) { + f <- c +} + +func (f clusterFuture) Get() *cluster { + c := <-f + // The cluster stays in the future so Get can be called again. + f <- c + return c +} + +// runTopLevelTest runs a top level tests, and its subtests, count times. A +// number of clusters will be created and then reused, subject to sem. +// reservedSlots represents the number of tokens that have been acquired for us +// from sem by the caller. So, we start of by creating reservedSlots clusters +// and then, as long as we need more, we create them when sem permits. Once a +// cluster is created, it is not released until count is exhausted (even when we +// go above reservedSlots and other people are competing for sem). +// Before returning, reservedSlots tokens are released to sem. +// +// Args: +// filter: A filter to be passed to subtests. The top-level test is assumed to +// need running, but some subtests might be excluded. +// wg: A WaitGroup that will be signaled when done. +func (r *registry) runTopLevelTest( + ctx context.Context, + spec testSpec, + filter *regexp.Regexp, + count int, + sem chan struct{}, + reservedSlots int, + wg *sync.WaitGroup, +) { + + // !!! deal with Skip + // !!! deal with debugEnabled + + if reservedSlots > count { + log.Fatalf(ctx, "more reservedSlots (%d) than count (%d)", reservedSlots, count) + } + + // createCluster creates a cluster according to this test's spec. Once + // started, the future is signaled. + createClusterAsync := func() clusterFuture { + f := makeClusterFuture() + go func() { + artifactsDir := filepath.Join(artifacts, teamCityNameEscape(spec.Name)) + t := &test{ + spec: &spec, + registry: r, + artifactsDir: artifactsDir, + } + f.Put(newCluster(ctx, t, spec.Nodes)) + }() + return f + } + + clusters := make(chan clusterFuture, reservedSlots) + // Create reservedSlots clusters asynchronously. + for i := 0; i < reservedSlots; i++ { + clusters <- createClusterAsync() + } + + for run := 0; run < count; { + // Wait until there's a cluster available for us, or a sem slot for creating + // a new cluster. We prefer already-existing cluster to creating new ones, + // hence the funky nested select structure. + var future clusterFuture + select { + case future = <-clusters: + default: + select { + case future = <-clusters: + case sem <- struct{}{}: + clusters <- createClusterAsync() + // Account for the new cluster in reservedSlots, so that we release the + // sem slot at the end. + reservedSlots++ + // Now we loop around, but next time around something will be available + // in the clusters channel. + continue + } + } + c := future.Get() // Wait until the cluster we selected has been started. + + run++ // We're about to run a test. + runNum := run + if count == 1 { + runNum = 0 + } + r.runAsync(ctx, &spec, filter, nil /* parent */, c, runNum, func(_ bool) { + // Return the cluster to the pool. + future := makeClusterFuture() + future.Put(c) + clusters <- future + }) + } + // After we've started executing all the iterations, we now drain the clusters + // that we've created and we release their sem slots. By doing this, we're + // also waiting for all the runs to finish. + for i := 0; i < reservedSlots; i++ { + future := <-clusters + c := future.Get() + c.Destroy(ctx) + <-sem + } + wg.Done() +} + // runAsync starts a goroutine that runs a test. If the test has subtests, // runAsync will be invoked recursively, but in a blocking manner. // @@ -788,20 +937,8 @@ func (r *registry) runAsync( } if !dryrun { - if c == nil { - c = newCluster(ctx, t, t.spec.Nodes) - if c != nil { - defer func() { - if !debugEnabled || !t.Failed() { - c.Destroy(ctx) - } else { - c.l.Printf("not destroying cluster to allow debugging\n") - } - }() - } - } else { - c = c.clone(t) - } + c = c.clone(t) + c.Wipe(ctx) } // If we have subtests, handle them here and return.