Skip to content

Commit

Permalink
roachtest: allow parallel execution of repeated runs
Browse files Browse the repository at this point in the history
Before this patch, roachtest --paralleism is ignored when running a
single test, and also the repeated runs of that test don't reuse
clusters.
This patch allows for parallel execution of different runs of a single
test, and also reuses clusters for those runs.
The structure of running tests is changed: the resources indicated by
--parallelism are now fairly divided initially between top-level tests,
and then each top-level test maintains a pool of clusters to be used by
repeated runs of itself and its subtests.
Clusters are created and managed at the top-level tests' level; it is
top level tests that determine a cluster's spec, so clusters are not
reused across them.
E.g. roachtest run TestFoo TestBar --parallelism 10 --count 100
will result in the initial allocation of 5 clusters for each test. As
TestBar finishes its 96th run, TestFoo is allowed to steal a "slot" and
expand its pool.

Release note: None
  • Loading branch information
andreimatei committed Oct 5, 2018
1 parent 6adae0e commit 77ac0ec
Showing 1 changed file with 153 additions and 16 deletions.
169 changes: 153 additions & 16 deletions pkg/cmd/roachtest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (
"sync"
"time"

version "github.com/hashicorp/go-version"
"github.com/petermattis/goid"

"github.com/cockroachdb/cockroach/pkg/cmd/internal/issues"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
version "github.com/hashicorp/go-version"
"github.com/petermattis/goid"
)

var (
Expand Down Expand Up @@ -620,6 +622,153 @@ func (t *test) IsBuildVersion(minVersion string) bool {
return !t.registry.buildVersion.LessThan(vers)
}

// runTests decides the initial resource allocation for each top-level tests
// (i.e. how many clusters it gets) and the delegates to runTopLevelTest.
func (r *registry) runTests(ctx context.Context, filter []string, count, parallelism int) {
// !!! deal with local

filterRE := makeFilterRE(filter)
topLevelTests := r.List(filterRE)
n := len(topLevelTests)
if parallelism > n*count {
parallelism = n * count
}
sem := make(chan struct{}, parallelism)
clustersPerTLT := parallelism / n
wg := sync.WaitGroup{}
wg.Add(n)
for i, spec := range topLevelTests {
numClusters := clustersPerTLT
if i < parallelism%n {
numClusters++
}
// Acquire slots from sem, to serve as the initial cluster pool to be used
// by used by this test. Actual cluster creation and destruction happens in
// runTopLevelTest(), as top-level tests determine a cluster's spec.
for j := 0; j < numClusters; j++ {
sem <- struct{}{}
}
go r.runTopLevelTest(ctx, *spec, filterRE, count, sem, numClusters, &wg)
}
// Wait for all tests to finish.
wg.Wait()
}

type clusterFuture chan *cluster

func makeClusterFuture() clusterFuture {
return clusterFuture(make(chan *cluster, 1))
}

func (f clusterFuture) Put(c *cluster) {
f <- c
}

func (f clusterFuture) Get() *cluster {
c := <-f
// The cluster stays in the future so Get can be called again.
f <- c
return c
}

// runTopLevelTest runs a top level tests, and its subtests, count times. A
// number of clusters will be created and then reused, subject to sem.
// reservedSlots represents the number of tokens that have been acquired for us
// from sem by the caller. So, we start of by creating reservedSlots clusters
// and then, as long as we need more, we create them when sem permits. Once a
// cluster is created, it is not released until count is exhausted (even when we
// go above reservedSlots and other people are competing for sem).
// Before returning, reservedSlots tokens are released to sem.
//
// Args:
// filter: A filter to be passed to subtests. The top-level test is assumed to
// need running, but some subtests might be excluded.
// wg: A WaitGroup that will be signaled when done.
func (r *registry) runTopLevelTest(
ctx context.Context,
spec testSpec,
filter *regexp.Regexp,
count int,
sem chan struct{},
reservedSlots int,
wg *sync.WaitGroup,
) {

// !!! deal with Skip
// !!! deal with debugEnabled

if reservedSlots > count {
log.Fatalf(ctx, "more reservedSlots (%d) than count (%d)", reservedSlots, count)
}

// createCluster creates a cluster according to this test's spec. Once
// started, the future is signaled.
createClusterAsync := func() clusterFuture {
f := makeClusterFuture()
go func() {
artifactsDir := filepath.Join(artifacts, teamCityNameEscape(spec.Name))
t := &test{
spec: &spec,
registry: r,
artifactsDir: artifactsDir,
}
f.Put(newCluster(ctx, t, spec.Nodes))
}()
return f
}

clusters := make(chan clusterFuture, reservedSlots)
// Create reservedSlots clusters asynchronously.
for i := 0; i < reservedSlots; i++ {
clusters <- createClusterAsync()
}

for run := 0; run < count; {
// Wait until there's a cluster available for us, or a sem slot for creating
// a new cluster. We prefer already-existing cluster to creating new ones,
// hence the funky nested select structure.
var future clusterFuture
select {
case future = <-clusters:
default:
select {
case future = <-clusters:
case sem <- struct{}{}:
clusters <- createClusterAsync()
// Account for the new cluster in reservedSlots, so that we release the
// sem slot at the end.
reservedSlots++
// Now we loop around, but next time around something will be available
// in the clusters channel.
continue
}
}
c := future.Get() // Wait until the cluster we selected has been started.

run++ // We're about to run a test.
runNum := run
if count == 1 {
runNum = 0
}
r.runAsync(ctx, &spec, filter, nil /* parent */, c, runNum, func(_ bool) {
// Return the cluster to the pool.
future := makeClusterFuture()
future.Put(c)
clusters <- future
})
}
// After we've started executing all the iterations, we now drain the clusters
// that we've created and we release their sem slots. By doing this, we're
// also waiting for all the runs to finish.
for i := 0; i < reservedSlots; i++ {
future := <-clusters
c := future.Get()
c.Destroy(ctx)
<-sem
}
wg.Done()
}

// runAsync starts a goroutine that runs a test. If the test has subtests,
// runAsync will be invoked recursively, but in a blocking manner.
//
Expand Down Expand Up @@ -788,20 +937,8 @@ func (r *registry) runAsync(
}

if !dryrun {
if c == nil {
c = newCluster(ctx, t, t.spec.Nodes)
if c != nil {
defer func() {
if !debugEnabled || !t.Failed() {
c.Destroy(ctx)
} else {
c.l.Printf("not destroying cluster to allow debugging\n")
}
}()
}
} else {
c = c.clone(t)
}
c = c.clone(t)
c.Wipe(ctx)
}

// If we have subtests, handle them here and return.
Expand Down

0 comments on commit 77ac0ec

Please sign in to comment.