From b45c5554fcb165cce5afa8d6049395c9413d816e Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 13 Jul 2022 14:53:10 -0700 Subject: [PATCH 1/3] colflow: prevent deadlocks when many queries spill to disk at same time This commit fixes a long-standing issue which could cause memory-intensive queries to deadlock on acquiring the file descriptors quota when vectorized execution spills to disk. This bug has been present since the introduction of disk-spilling (over two and a half years ago, introduced in #45318 and partially mitigated in #45892), but we haven't seen this in any user reports, only in `tpch_concurrency` roachtest runs, so the severity seems pretty minor. Consider the following query plan: ``` Node 1 Node 2 TableReader TableReader | | HashRouter HashRouter | \ ___________ / | | \/__________ | | / \ | HashAggregator HashAggregator ``` and let's imagine that each hash aggregator has to spill to disk. This would require acquiring the file descriptors quota. Now, imagine that because of that hash aggregators' spilling, each of the hash routers has slow outputs causing them to spill too. As a result, this query plan can require `A + 2 * R` number of FDs of a single node to succeed where `A` is the quota for a single hash aggregator (equal to 16 - with the default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which is 256) and `R` is the quota for a single router output (2). This means that we can estimate that 20 FDs from each node are needed for the query to finish execution with 16 FDs being acquired first. Now imagine that this query is run with concurrency of 16. We can end up in such a situation that all hash aggregators have spilled, fully exhausting the global node limit on each node, so whenever the hash router outputs need to spill, they block forever since no FDs will ever be released, until a query is canceled or a node is shutdown. In other words, we have a deadlock. This commit fixes this situation by introducing a retry mechanism to exponentially backoff when trying to acquire the FD quota, until a time out. The randomizations provided by the `retry` package should be sufficient so that some of the queries succeed while others result in an error. Unfortunately, I don't see a way to prevent this deadlock from occurring in the first place without possible increase in latency in some case. The difficult thing is that we currently acquire FDs only once we need them, meaning once a particular component spills to disk. We could acquire the maximum number of FDs that a query might need up-front, before the query execution starts, but that could lead to starvation of the queries that ultimately won't spill to disk. This seems like a much worse impact than receiving timeout errors on some analytical queries when run with high concurrency. We're not an OLAP database, so this behavior seems ok. Release note (bug fix): Previously, CockroachDB could deadlock when evaluating analytical queries if multiple queries had to spill to disk at the same time. This is now fixed by making some of the queries error out instead. If a user knows that there is no deadlock and that some analytical queries that have spilled just taking too long, blocking other queries from spilling, and is ok with waiting for longer, the user can adjust newly introduced `sql.distsql.acquire_vec_fds.max_retries` cluster setting (using 0 to get the previous behavior of indefinite waiting until spilling resources open up). --- pkg/sql/colexec/colbuilder/execplan.go | 1 + pkg/sql/colexec/external_sort.go | 4 + pkg/sql/colexec/hash_based_partitioner.go | 3 + pkg/sql/colflow/BUILD.bazel | 6 ++ pkg/sql/colflow/vectorized_flow.go | 99 ++++++++++++++----- .../colflow/vectorized_flow_deadlock_test.go | 77 +++++++++++++++ pkg/sql/execinfra/server_config.go | 9 ++ 7 files changed, 175 insertions(+), 24 deletions(-) create mode 100644 pkg/sql/colflow/vectorized_flow_deadlock_test.go diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 70ac326aab86..ba9710b0ff81 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -444,6 +444,7 @@ func (r opResult) createDiskBackedSort( args.DiskQueueCfg, args.FDSemaphore, diskAccount, + flowCtx.TestingKnobs().VecFDsToAcquire, ) r.ToClose = append(r.ToClose, es.(colexecop.Closer)) return es diff --git a/pkg/sql/colexec/external_sort.go b/pkg/sql/colexec/external_sort.go index 606b796f37e0..6176a042e805 100644 --- a/pkg/sql/colexec/external_sort.go +++ b/pkg/sql/colexec/external_sort.go @@ -231,6 +231,7 @@ func NewExternalSorter( diskQueueCfg colcontainer.DiskQueueCfg, fdSemaphore semaphore.Semaphore, diskAcc *mon.BoundAccount, + testingVecFDsToAcquire int, ) colexecop.Operator { if diskQueueCfg.BufferSizeBytes > 0 && maxNumberPartitions == 0 { // With the default limit of 256 file descriptors, this results in 16 @@ -242,6 +243,9 @@ func NewExternalSorter( if maxNumberPartitions < colexecop.ExternalSorterMinPartitions { maxNumberPartitions = colexecop.ExternalSorterMinPartitions } + if testingVecFDsToAcquire > 0 { + maxNumberPartitions = testingVecFDsToAcquire + } if memoryLimit == 1 { // If memory limit is 1, we're likely in a "force disk spill" // scenario, but we don't want to artificially limit batches when we diff --git a/pkg/sql/colexec/hash_based_partitioner.go b/pkg/sql/colexec/hash_based_partitioner.go index adfbca768d03..8824bd6601e7 100644 --- a/pkg/sql/colexec/hash_based_partitioner.go +++ b/pkg/sql/colexec/hash_based_partitioner.go @@ -307,6 +307,9 @@ func calculateMaxNumberActivePartitions( if maxNumberActivePartitions < numRequiredActivePartitions { maxNumberActivePartitions = numRequiredActivePartitions } + if toAcquire := flowCtx.TestingKnobs().VecFDsToAcquire; toAcquire > 0 { + maxNumberActivePartitions = toAcquire + } return maxNumberActivePartitions } diff --git a/pkg/sql/colflow/BUILD.bazel b/pkg/sql/colflow/BUILD.bazel index a59138309c3f..13f3b0ecf475 100644 --- a/pkg/sql/colflow/BUILD.bazel +++ b/pkg/sql/colflow/BUILD.bazel @@ -32,6 +32,8 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/flowinfra", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", "//pkg/sql/rowenc", "//pkg/sql/rowexec", "//pkg/sql/sessiondatapb", @@ -44,6 +46,7 @@ go_library( "//pkg/util/mon", "//pkg/util/optional", "//pkg/util/randutil", + "//pkg/util/retry", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", @@ -65,6 +68,7 @@ go_test( "main_test.go", "routers_test.go", "stats_test.go", + "vectorized_flow_deadlock_test.go", "vectorized_flow_planning_test.go", "vectorized_flow_shutdown_test.go", "vectorized_flow_space_test.go", @@ -118,6 +122,8 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/admission", "//pkg/util/buildutil", + "//pkg/util/cancelchecker", + "//pkg/util/envutil", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/leaktest", diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 410fc66dd523..96e1b408f0b3 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -34,6 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -44,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/optional" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -52,39 +55,84 @@ import ( "github.com/marusama/semaphore" ) -// countingSemaphore is a semaphore that keeps track of the semaphore count from -// its perspective. -// Note that it effectively implements the execinfra.Releasable interface but -// due to the method name conflict doesn't. -type countingSemaphore struct { +// fdCountingSemaphore is a semaphore that keeps track of the number of file +// descriptors currently used by the vectorized engine. +// +// Note that it effectively implements the execreleasable.Releasable interface +// but due to the method name conflict doesn't. +type fdCountingSemaphore struct { semaphore.Semaphore - globalCount *metric.Gauge - count int64 + globalCount *metric.Gauge + count int64 + testingAcquireMaxRetries int } -var countingSemaphorePool = sync.Pool{ +var fdCountingSemaphorePool = sync.Pool{ New: func() interface{} { - return &countingSemaphore{} + return &fdCountingSemaphore{} }, } -func newCountingSemaphore(sem semaphore.Semaphore, globalCount *metric.Gauge) *countingSemaphore { - s := countingSemaphorePool.Get().(*countingSemaphore) +func newFDCountingSemaphore( + sem semaphore.Semaphore, globalCount *metric.Gauge, testingAcquireMaxRetries int, +) *fdCountingSemaphore { + s := fdCountingSemaphorePool.Get().(*fdCountingSemaphore) s.Semaphore = sem s.globalCount = globalCount + s.testingAcquireMaxRetries = testingAcquireMaxRetries return s } -func (s *countingSemaphore) Acquire(ctx context.Context, n int) error { - if err := s.Semaphore.Acquire(ctx, n); err != nil { - return err +var errAcquireTimeout = pgerror.New( + pgcode.ConfigurationLimitExceeded, + "acquiring of file descriptors timed out, consider increasing "+ + "COCKROACH_VEC_MAX_OPEN_FDS environment variable", +) + +func (s *fdCountingSemaphore) Acquire(ctx context.Context, n int) error { + if s.TryAcquire(n) { + return nil } - atomic.AddInt64(&s.count, int64(n)) - s.globalCount.Inc(int64(n)) - return nil + // Currently there is not enough capacity in the semaphore to acquire the + // desired number, so we set up a retry loop that exponentially backs off, + // until either the semaphore opens up or we time out (most likely due to a + // deadlock). + // + // The latter situation is possible when multiple queries already hold some + // of the quota and each of them needs more to proceed resulting in a + // deadlock. We get out of such a deadlock by randomly erroring out one of + // the queries (which would release some quota back to the semaphore) making + // it possible for other queries to proceed. + // + // Note that we've already tried to acquire the quota above (which failed), + // so the initial backoff time of 100ms seems ok (we are spilling to disk + // after all, so the query is likely to experience significant latency). The + // current choice of options is such that we'll spend on the order of 25s + // in the retry loop before timing out. + maxRetries := s.testingAcquireMaxRetries + if maxRetries <= 0 { + // Make sure that the retry loop is finite. + maxRetries = 8 + } + opts := retry.Options{ + InitialBackoff: 100 * time.Millisecond, + Multiplier: 2.0, + RandomizationFactor: 0.25, + MaxRetries: maxRetries, + } + for r := retry.StartWithCtx(ctx, opts); r.Next(); { + if s.TryAcquire(n) { + return nil + } + } + if ctx.Err() != nil { + return ctx.Err() + } + log.Warning(ctx, "acquiring of file descriptors for disk-spilling timed out") + return errAcquireTimeout } -func (s *countingSemaphore) TryAcquire(n int) bool { +func (s *fdCountingSemaphore) TryAcquire(n int) bool { success := s.Semaphore.TryAcquire(n) if !success { return false @@ -94,7 +142,7 @@ func (s *countingSemaphore) TryAcquire(n int) bool { return success } -func (s *countingSemaphore) Release(n int) int { +func (s *fdCountingSemaphore) Release(n int) int { atomic.AddInt64(&s.count, int64(-n)) s.globalCount.Dec(int64(n)) return s.Semaphore.Release(n) @@ -103,12 +151,12 @@ func (s *countingSemaphore) Release(n int) int { // ReleaseToPool should be named Release and should implement the // execinfra.Releasable interface, but that would lead to a conflict with // semaphore.Semaphore.Release method. -func (s *countingSemaphore) ReleaseToPool() { +func (s *fdCountingSemaphore) ReleaseToPool() { if unreleased := atomic.LoadInt64(&s.count); unreleased != 0 { colexecerror.InternalError(errors.Newf("unexpectedly %d count on the semaphore when releasing it to the pool", unreleased)) } - *s = countingSemaphore{} - countingSemaphorePool.Put(s) + *s = fdCountingSemaphore{} + fdCountingSemaphorePool.Put(s) } type vectorizedFlow struct { @@ -127,7 +175,7 @@ type vectorizedFlow struct { // of the number of resources held in a semaphore.Semaphore requested from the // context of this flow so that these can be released unconditionally upon // Cleanup. - countingSemaphore *countingSemaphore + countingSemaphore *fdCountingSemaphore tempStorage struct { syncutil.Mutex @@ -195,8 +243,11 @@ func (f *vectorizedFlow) Setup( if err := diskQueueCfg.EnsureDefaults(); err != nil { return ctx, nil, err } - f.countingSemaphore = newCountingSemaphore(f.Cfg.VecFDSemaphore, f.Cfg.Metrics.VecOpenFDs) flowCtx := f.GetFlowCtx() + f.countingSemaphore = newFDCountingSemaphore( + f.Cfg.VecFDSemaphore, f.Cfg.Metrics.VecOpenFDs, + flowCtx.TestingKnobs().VecFDsAcquireMaxRetriesCount, + ) f.creator = newVectorizedFlowCreator( helper, vectorizedRemoteComponentCreator{}, diff --git a/pkg/sql/colflow/vectorized_flow_deadlock_test.go b/pkg/sql/colflow/vectorized_flow_deadlock_test.go new file mode 100644 index 000000000000..c823f50dc856 --- /dev/null +++ b/pkg/sql/colflow/vectorized_flow_deadlock_test.go @@ -0,0 +1,77 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colflow_test + +import ( + "context" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" + "github.com/cockroachdb/cockroach/pkg/util/envutil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// TestVectorizedFlowDeadlocksWhenSpilling is a regression test for the +// vectorized flow being deadlocked when multiple operators have to spill to +// disk exhausting the file descriptor limit. +func TestVectorizedFlowDeadlocksWhenSpilling(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderStressRace(t, "the test is too slow under stressrace") + + vecFDsLimit := 8 + envutil.TestSetEnv(t, "COCKROACH_VEC_MAX_OPEN_FDS", strconv.Itoa(vecFDsLimit)) + serverArgs := base.TestServerArgs{ + Knobs: base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{ + // Set the testing knob so that the first operator to spill would + // use up the whole FD limit. + VecFDsToAcquire: vecFDsLimit, + // Allow just one retry to speed up the test. + VecFDsAcquireMaxRetriesCount: 1, + }}, + } + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: serverArgs}) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + conn := tc.Conns[0] + + _, err := conn.ExecContext(ctx, "CREATE TABLE t (a, b) AS SELECT i, i FROM generate_series(1, 10000) AS g(i)") + require.NoError(t, err) + // Lower the workmem budget so that all buffering operators have to spill to + // disk. + _, err = conn.ExecContext(ctx, "SET distsql_workmem = '1KiB'") + require.NoError(t, err) + + queryCtx, queryCtxCancel := context.WithDeadline(ctx, timeutil.Now().Add(10*time.Second)) + defer queryCtxCancel() + // Run a query with a hash joiner feeding into a hash aggregator, with both + // operators spilling to disk. We expect that the hash aggregator won't be + // able to spill though since the FD limit has been used up, and we'd like + // to see the query timing out (when acquiring the file descriptor quota) + // rather than being canceled due to the context deadline. + query := "SELECT max(a) FROM (SELECT t1.a, t1.b FROM t AS t1 INNER HASH JOIN t AS t2 ON t1.a = t2.b) GROUP BY b" + _, err = conn.ExecContext(queryCtx, query) + // We expect an error that is different from the query cancellation (which + // is what SQL layer returns on a context cancellation). + require.NotNil(t, err) + require.False(t, strings.Contains(err.Error(), cancelchecker.QueryCanceledError.Error())) +} diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index a8c59f153a1b..6c776c250939 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -244,6 +244,15 @@ type TestingKnobs struct { // Cannot be set together with ForceDiskSpill. MemoryLimitBytes int64 + // VecFDsToAcquire, if positive, indicates the number of file descriptors + // that should be acquired by a single disk-spilling operator in the + // vectorized engine. + VecFDsToAcquire int + // VecFDsAcquireMaxRetriesCount, if positive, determines the maximum number + // of retries done when acquiring the file descriptors for a disk-spilling + // operator in the vectorized engine. + VecFDsAcquireMaxRetriesCount int + // TableReaderBatchBytesLimit, if not 0, overrides the limit that the // TableReader will set on the size of results it wants to get for individual // requests. From bfe4fa69a019af1bfc0db945e6bfc6d4a1679fd3 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 14 Jul 2022 07:47:13 -0700 Subject: [PATCH 2/3] roachtest: remove some debugging printouts in tpch_concurrency This was added to track down the deadlock fixed in the previous commit, so we no longer need it. Release note: None --- pkg/cmd/roachtest/tests/tpch_concurrency.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/pkg/cmd/roachtest/tests/tpch_concurrency.go b/pkg/cmd/roachtest/tests/tpch_concurrency.go index e51236fb9ba3..3ee60dbe27ae 100644 --- a/pkg/cmd/roachtest/tests/tpch_concurrency.go +++ b/pkg/cmd/roachtest/tests/tpch_concurrency.go @@ -13,7 +13,6 @@ package tests import ( "context" "fmt" - "strings" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -96,22 +95,6 @@ func registerTPCHConcurrency(r registry.Registry) { // Run each query once on each connection. for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ { t.Status("running Q", queryNum) - // To aid during the debugging later, we'll print the DistSQL - // diagram of the query. - rows, err := conn.Query("EXPLAIN (DISTSQL) " + tpch.QueriesByNumber[queryNum]) - if err != nil { - return err - } - defer rows.Close() - for rows.Next() { - var line string - if err = rows.Scan(&line); err != nil { - t.Fatal(err) - } - if strings.Contains(line, "Diagram:") { - t.Status(line) - } - } // The way --max-ops flag works is as follows: the global ops // counter is incremented **after** each worker completes a // single operation, so it is possible for all connections start From 9d9bf55f83ea70c9ee5d625a8e7f284ca58ddbe9 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 19 Jul 2022 16:44:43 -0700 Subject: [PATCH 3/3] colflow: introduce a cluster setting for max retries of FD acquisition We recently introduced a mechanism of retrying the acquision of file descriptors needed for the disk-spilling queries to be able to get out of a deadlock. We hard-coded the number of retries at 8, and this commit makes that number configurable via a cluster setting (the idea is that some users might be ok retrying for longer, so they will have an option to do that). This cluster setting will also be the escape hatch to the previous behavior (indefinite wait on `Acquire`) when the setting is set to zero. Release note: None --- pkg/sql/colflow/BUILD.bazel | 1 + pkg/sql/colflow/vectorized_flow.go | 34 +++++++++++-------- .../colflow/vectorized_flow_deadlock_test.go | 5 +-- pkg/sql/execinfra/server_config.go | 4 --- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/pkg/sql/colflow/BUILD.bazel b/pkg/sql/colflow/BUILD.bazel index 13f3b0ecf475..83b9cb980c7b 100644 --- a/pkg/sql/colflow/BUILD.bazel +++ b/pkg/sql/colflow/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/col/coldataext", "//pkg/roachpb", "//pkg/rpc/nodedialer", + "//pkg/settings", "//pkg/sql/catalog/descs", "//pkg/sql/colcontainer", "//pkg/sql/colexec", diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 96e1b408f0b3..35013565ff50 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" "github.com/cockroachdb/cockroach/pkg/sql/colexec" @@ -62,9 +63,9 @@ import ( // but due to the method name conflict doesn't. type fdCountingSemaphore struct { semaphore.Semaphore - globalCount *metric.Gauge - count int64 - testingAcquireMaxRetries int + globalCount *metric.Gauge + count int64 + acquireMaxRetries int } var fdCountingSemaphorePool = sync.Pool{ @@ -74,12 +75,12 @@ var fdCountingSemaphorePool = sync.Pool{ } func newFDCountingSemaphore( - sem semaphore.Semaphore, globalCount *metric.Gauge, testingAcquireMaxRetries int, + sem semaphore.Semaphore, globalCount *metric.Gauge, sv *settings.Values, ) *fdCountingSemaphore { s := fdCountingSemaphorePool.Get().(*fdCountingSemaphore) s.Semaphore = sem s.globalCount = globalCount - s.testingAcquireMaxRetries = testingAcquireMaxRetries + s.acquireMaxRetries = int(fdCountingSemaphoreMaxRetries.Get(sv)) return s } @@ -89,6 +90,16 @@ var errAcquireTimeout = pgerror.New( "COCKROACH_VEC_MAX_OPEN_FDS environment variable", ) +var fdCountingSemaphoreMaxRetries = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.distsql.acquire_vec_fds.max_retries", + "determines the number of retries performed during the acquisition of "+ + "file descriptors needed for disk-spilling operations, set to 0 for "+ + "unlimited retries", + 8, + settings.NonNegativeInt, +) + func (s *fdCountingSemaphore) Acquire(ctx context.Context, n int) error { if s.TryAcquire(n) { return nil @@ -108,17 +119,13 @@ func (s *fdCountingSemaphore) Acquire(ctx context.Context, n int) error { // so the initial backoff time of 100ms seems ok (we are spilling to disk // after all, so the query is likely to experience significant latency). The // current choice of options is such that we'll spend on the order of 25s - // in the retry loop before timing out. - maxRetries := s.testingAcquireMaxRetries - if maxRetries <= 0 { - // Make sure that the retry loop is finite. - maxRetries = 8 - } + // in the retry loop before timing out with the default value of the + // 'sql.distsql.acquire_vec_fds.max_retries' cluster settings. opts := retry.Options{ InitialBackoff: 100 * time.Millisecond, Multiplier: 2.0, RandomizationFactor: 0.25, - MaxRetries: maxRetries, + MaxRetries: s.acquireMaxRetries, } for r := retry.StartWithCtx(ctx, opts); r.Next(); { if s.TryAcquire(n) { @@ -245,8 +252,7 @@ func (f *vectorizedFlow) Setup( } flowCtx := f.GetFlowCtx() f.countingSemaphore = newFDCountingSemaphore( - f.Cfg.VecFDSemaphore, f.Cfg.Metrics.VecOpenFDs, - flowCtx.TestingKnobs().VecFDsAcquireMaxRetriesCount, + f.Cfg.VecFDSemaphore, f.Cfg.Metrics.VecOpenFDs, &flowCtx.EvalCtx.Settings.SV, ) f.creator = newVectorizedFlowCreator( helper, diff --git a/pkg/sql/colflow/vectorized_flow_deadlock_test.go b/pkg/sql/colflow/vectorized_flow_deadlock_test.go index c823f50dc856..e6f50d65048a 100644 --- a/pkg/sql/colflow/vectorized_flow_deadlock_test.go +++ b/pkg/sql/colflow/vectorized_flow_deadlock_test.go @@ -45,8 +45,6 @@ func TestVectorizedFlowDeadlocksWhenSpilling(t *testing.T) { // Set the testing knob so that the first operator to spill would // use up the whole FD limit. VecFDsToAcquire: vecFDsLimit, - // Allow just one retry to speed up the test. - VecFDsAcquireMaxRetriesCount: 1, }}, } tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: serverArgs}) @@ -60,6 +58,9 @@ func TestVectorizedFlowDeadlocksWhenSpilling(t *testing.T) { // disk. _, err = conn.ExecContext(ctx, "SET distsql_workmem = '1KiB'") require.NoError(t, err) + // Allow just one retry to speed up the test. + _, err = conn.ExecContext(ctx, "SET CLUSTER SETTING sql.distsql.acquire_vec_fds.max_retries = 1") + require.NoError(t, err) queryCtx, queryCtxCancel := context.WithDeadline(ctx, timeutil.Now().Add(10*time.Second)) defer queryCtxCancel() diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 6c776c250939..17230d924317 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -248,10 +248,6 @@ type TestingKnobs struct { // that should be acquired by a single disk-spilling operator in the // vectorized engine. VecFDsToAcquire int - // VecFDsAcquireMaxRetriesCount, if positive, determines the maximum number - // of retries done when acquiring the file descriptors for a disk-spilling - // operator in the vectorized engine. - VecFDsAcquireMaxRetriesCount int // TableReaderBatchBytesLimit, if not 0, overrides the limit that the // TableReader will set on the size of results it wants to get for individual