Skip to content

Commit

Permalink
Merge pull request #84657 from yuzefovich/backport22.1-84398
Browse files Browse the repository at this point in the history
release-22.1: colflow: prevent deadlocks when many queries spill to disk at same time
  • Loading branch information
yuzefovich authored Jul 21, 2022
2 parents 0b9023d + 9d9bf55 commit ac6ee5e
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 41 deletions.
17 changes: 0 additions & 17 deletions pkg/cmd/roachtest/tests/tpch_concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package tests
import (
"context"
"fmt"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
Expand Down Expand Up @@ -96,22 +95,6 @@ func registerTPCHConcurrency(r registry.Registry) {
// Run each query once on each connection.
for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ {
t.Status("running Q", queryNum)
// To aid during the debugging later, we'll print the DistSQL
// diagram of the query.
rows, err := conn.Query("EXPLAIN (DISTSQL) " + tpch.QueriesByNumber[queryNum])
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var line string
if err = rows.Scan(&line); err != nil {
t.Fatal(err)
}
if strings.Contains(line, "Diagram:") {
t.Status(line)
}
}
// The way --max-ops flag works is as follows: the global ops
// counter is incremented **after** each worker completes a
// single operation, so it is possible for all connections start
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ func (r opResult) createDiskBackedSort(
args.DiskQueueCfg,
args.FDSemaphore,
diskAccount,
flowCtx.TestingKnobs().VecFDsToAcquire,
)
r.ToClose = append(r.ToClose, es.(colexecop.Closer))
return es
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func NewExternalSorter(
diskQueueCfg colcontainer.DiskQueueCfg,
fdSemaphore semaphore.Semaphore,
diskAcc *mon.BoundAccount,
testingVecFDsToAcquire int,
) colexecop.Operator {
if diskQueueCfg.BufferSizeBytes > 0 && maxNumberPartitions == 0 {
// With the default limit of 256 file descriptors, this results in 16
Expand All @@ -242,6 +243,9 @@ func NewExternalSorter(
if maxNumberPartitions < colexecop.ExternalSorterMinPartitions {
maxNumberPartitions = colexecop.ExternalSorterMinPartitions
}
if testingVecFDsToAcquire > 0 {
maxNumberPartitions = testingVecFDsToAcquire
}
if memoryLimit == 1 {
// If memory limit is 1, we're likely in a "force disk spill"
// scenario, but we don't want to artificially limit batches when we
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/hash_based_partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ func calculateMaxNumberActivePartitions(
if maxNumberActivePartitions < numRequiredActivePartitions {
maxNumberActivePartitions = numRequiredActivePartitions
}
if toAcquire := flowCtx.TestingKnobs().VecFDsToAcquire; toAcquire > 0 {
maxNumberActivePartitions = toAcquire
}
return maxNumberActivePartitions
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/col/coldataext",
"//pkg/roachpb",
"//pkg/rpc/nodedialer",
"//pkg/settings",
"//pkg/sql/catalog/descs",
"//pkg/sql/colcontainer",
"//pkg/sql/colexec",
Expand All @@ -32,6 +33,8 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/sessiondatapb",
Expand All @@ -44,6 +47,7 @@ go_library(
"//pkg/util/mon",
"//pkg/util/optional",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
Expand All @@ -65,6 +69,7 @@ go_test(
"main_test.go",
"routers_test.go",
"stats_test.go",
"vectorized_flow_deadlock_test.go",
"vectorized_flow_planning_test.go",
"vectorized_flow_shutdown_test.go",
"vectorized_flow_space_test.go",
Expand Down Expand Up @@ -118,6 +123,8 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/admission",
"//pkg/util/buildutil",
"//pkg/util/cancelchecker",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
Expand Down
105 changes: 81 additions & 24 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldataext"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
Expand All @@ -34,6 +35,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand All @@ -44,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/optional"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand All @@ -52,39 +56,90 @@ import (
"github.com/marusama/semaphore"
)

// countingSemaphore is a semaphore that keeps track of the semaphore count from
// its perspective.
// Note that it effectively implements the execinfra.Releasable interface but
// due to the method name conflict doesn't.
type countingSemaphore struct {
// fdCountingSemaphore is a semaphore that keeps track of the number of file
// descriptors currently used by the vectorized engine.
//
// Note that it effectively implements the execreleasable.Releasable interface
// but due to the method name conflict doesn't.
type fdCountingSemaphore struct {
semaphore.Semaphore
globalCount *metric.Gauge
count int64
globalCount *metric.Gauge
count int64
acquireMaxRetries int
}

var countingSemaphorePool = sync.Pool{
var fdCountingSemaphorePool = sync.Pool{
New: func() interface{} {
return &countingSemaphore{}
return &fdCountingSemaphore{}
},
}

func newCountingSemaphore(sem semaphore.Semaphore, globalCount *metric.Gauge) *countingSemaphore {
s := countingSemaphorePool.Get().(*countingSemaphore)
func newFDCountingSemaphore(
sem semaphore.Semaphore, globalCount *metric.Gauge, sv *settings.Values,
) *fdCountingSemaphore {
s := fdCountingSemaphorePool.Get().(*fdCountingSemaphore)
s.Semaphore = sem
s.globalCount = globalCount
s.acquireMaxRetries = int(fdCountingSemaphoreMaxRetries.Get(sv))
return s
}

func (s *countingSemaphore) Acquire(ctx context.Context, n int) error {
if err := s.Semaphore.Acquire(ctx, n); err != nil {
return err
var errAcquireTimeout = pgerror.New(
pgcode.ConfigurationLimitExceeded,
"acquiring of file descriptors timed out, consider increasing "+
"COCKROACH_VEC_MAX_OPEN_FDS environment variable",
)

var fdCountingSemaphoreMaxRetries = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.distsql.acquire_vec_fds.max_retries",
"determines the number of retries performed during the acquisition of "+
"file descriptors needed for disk-spilling operations, set to 0 for "+
"unlimited retries",
8,
settings.NonNegativeInt,
)

func (s *fdCountingSemaphore) Acquire(ctx context.Context, n int) error {
if s.TryAcquire(n) {
return nil
}
atomic.AddInt64(&s.count, int64(n))
s.globalCount.Inc(int64(n))
return nil
// Currently there is not enough capacity in the semaphore to acquire the
// desired number, so we set up a retry loop that exponentially backs off,
// until either the semaphore opens up or we time out (most likely due to a
// deadlock).
//
// The latter situation is possible when multiple queries already hold some
// of the quota and each of them needs more to proceed resulting in a
// deadlock. We get out of such a deadlock by randomly erroring out one of
// the queries (which would release some quota back to the semaphore) making
// it possible for other queries to proceed.
//
// Note that we've already tried to acquire the quota above (which failed),
// so the initial backoff time of 100ms seems ok (we are spilling to disk
// after all, so the query is likely to experience significant latency). The
// current choice of options is such that we'll spend on the order of 25s
// in the retry loop before timing out with the default value of the
// 'sql.distsql.acquire_vec_fds.max_retries' cluster settings.
opts := retry.Options{
InitialBackoff: 100 * time.Millisecond,
Multiplier: 2.0,
RandomizationFactor: 0.25,
MaxRetries: s.acquireMaxRetries,
}
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
if s.TryAcquire(n) {
return nil
}
}
if ctx.Err() != nil {
return ctx.Err()
}
log.Warning(ctx, "acquiring of file descriptors for disk-spilling timed out")
return errAcquireTimeout
}

func (s *countingSemaphore) TryAcquire(n int) bool {
func (s *fdCountingSemaphore) TryAcquire(n int) bool {
success := s.Semaphore.TryAcquire(n)
if !success {
return false
Expand All @@ -94,7 +149,7 @@ func (s *countingSemaphore) TryAcquire(n int) bool {
return success
}

func (s *countingSemaphore) Release(n int) int {
func (s *fdCountingSemaphore) Release(n int) int {
atomic.AddInt64(&s.count, int64(-n))
s.globalCount.Dec(int64(n))
return s.Semaphore.Release(n)
Expand All @@ -103,12 +158,12 @@ func (s *countingSemaphore) Release(n int) int {
// ReleaseToPool should be named Release and should implement the
// execinfra.Releasable interface, but that would lead to a conflict with
// semaphore.Semaphore.Release method.
func (s *countingSemaphore) ReleaseToPool() {
func (s *fdCountingSemaphore) ReleaseToPool() {
if unreleased := atomic.LoadInt64(&s.count); unreleased != 0 {
colexecerror.InternalError(errors.Newf("unexpectedly %d count on the semaphore when releasing it to the pool", unreleased))
}
*s = countingSemaphore{}
countingSemaphorePool.Put(s)
*s = fdCountingSemaphore{}
fdCountingSemaphorePool.Put(s)
}

type vectorizedFlow struct {
Expand All @@ -127,7 +182,7 @@ type vectorizedFlow struct {
// of the number of resources held in a semaphore.Semaphore requested from the
// context of this flow so that these can be released unconditionally upon
// Cleanup.
countingSemaphore *countingSemaphore
countingSemaphore *fdCountingSemaphore

tempStorage struct {
syncutil.Mutex
Expand Down Expand Up @@ -195,8 +250,10 @@ func (f *vectorizedFlow) Setup(
if err := diskQueueCfg.EnsureDefaults(); err != nil {
return ctx, nil, err
}
f.countingSemaphore = newCountingSemaphore(f.Cfg.VecFDSemaphore, f.Cfg.Metrics.VecOpenFDs)
flowCtx := f.GetFlowCtx()
f.countingSemaphore = newFDCountingSemaphore(
f.Cfg.VecFDSemaphore, f.Cfg.Metrics.VecOpenFDs, &flowCtx.EvalCtx.Settings.SV,
)
f.creator = newVectorizedFlowCreator(
helper,
vectorizedRemoteComponentCreator{},
Expand Down
78 changes: 78 additions & 0 deletions pkg/sql/colflow/vectorized_flow_deadlock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colflow_test

import (
"context"
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

// TestVectorizedFlowDeadlocksWhenSpilling is a regression test for the
// vectorized flow being deadlocked when multiple operators have to spill to
// disk exhausting the file descriptor limit.
func TestVectorizedFlowDeadlocksWhenSpilling(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStressRace(t, "the test is too slow under stressrace")

vecFDsLimit := 8
envutil.TestSetEnv(t, "COCKROACH_VEC_MAX_OPEN_FDS", strconv.Itoa(vecFDsLimit))
serverArgs := base.TestServerArgs{
Knobs: base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{
// Set the testing knob so that the first operator to spill would
// use up the whole FD limit.
VecFDsToAcquire: vecFDsLimit,
}},
}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: serverArgs})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]

_, err := conn.ExecContext(ctx, "CREATE TABLE t (a, b) AS SELECT i, i FROM generate_series(1, 10000) AS g(i)")
require.NoError(t, err)
// Lower the workmem budget so that all buffering operators have to spill to
// disk.
_, err = conn.ExecContext(ctx, "SET distsql_workmem = '1KiB'")
require.NoError(t, err)
// Allow just one retry to speed up the test.
_, err = conn.ExecContext(ctx, "SET CLUSTER SETTING sql.distsql.acquire_vec_fds.max_retries = 1")
require.NoError(t, err)

queryCtx, queryCtxCancel := context.WithDeadline(ctx, timeutil.Now().Add(10*time.Second))
defer queryCtxCancel()
// Run a query with a hash joiner feeding into a hash aggregator, with both
// operators spilling to disk. We expect that the hash aggregator won't be
// able to spill though since the FD limit has been used up, and we'd like
// to see the query timing out (when acquiring the file descriptor quota)
// rather than being canceled due to the context deadline.
query := "SELECT max(a) FROM (SELECT t1.a, t1.b FROM t AS t1 INNER HASH JOIN t AS t2 ON t1.a = t2.b) GROUP BY b"
_, err = conn.ExecContext(queryCtx, query)
// We expect an error that is different from the query cancellation (which
// is what SQL layer returns on a context cancellation).
require.NotNil(t, err)
require.False(t, strings.Contains(err.Error(), cancelchecker.QueryCanceledError.Error()))
}
5 changes: 5 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ type TestingKnobs struct {
// Cannot be set together with ForceDiskSpill.
MemoryLimitBytes int64

// VecFDsToAcquire, if positive, indicates the number of file descriptors
// that should be acquired by a single disk-spilling operator in the
// vectorized engine.
VecFDsToAcquire int

// TableReaderBatchBytesLimit, if not 0, overrides the limit that the
// TableReader will set on the size of results it wants to get for individual
// requests.
Expand Down

0 comments on commit ac6ee5e

Please sign in to comment.