Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
74563: kv,kvcoord,sql: poison txnCoordSender after a retryable error r=lidorcarmel a=lidorcarmel

Previously kv users could lose parts of a transaction without getting an
error. After Send() returned a retryable error the state of txn got reset
which made it usable again. If the caller ignored the error they could
continue applying more operations without realizing the first part of the
transaction was discarded. See more details in the issue (#22615).

The simple case example is where the retryable closure of DB.Txn() returns
nil instead of returning the retryable error back to the retry loop - in this
case the retry loop declares success without realizing we lost the first part
of the transaction (all the operations before the retryable error).

This PR leaves the txn in a "poisoned" state after encountering an error, so
that all future operations fail fast. The caller is therefore expected to
reset the txn handle back to a usable state intentionally, by calling
Txn.PrepareForRetry(). In the simple case of DB.Txn() the retry loop will
reset the handle and run the retry even if the callback returned nil.

Closes #22615

Release note: None

74662: tsdb: expand mem per worker based on sql pool size r=dhartunian a=dhartunian

Previously, the memory limit for all `tsdb` workers was set at a static
64MiB. This cap created issues seen in #24018 where this limit was hit
on a 30 node cluster. To alleviate the issue, the number of workers was
reduced, raising the per-worker allocation.

We've currently hit this limit again as part of load testing with larger
clusters and have decided to make the per-query worker memory limit
dynamic. The per-worker limit is now raised based on the amount of memory
available to the SQL Pool via the `MemoryPoolSize` configuration
variable. This is set to be 25% of the system memory by default. The
`tsdb` memory cap per-worker is now doubled until it reaches `1/128` of
the memory pool setting.

For example, on a node with 128 - 256 GiB of memory, this will
correspond to 512 MiB allocated for all running `tsdb` queries.

In addition, the ts server is now connected to the same `BytesMonitor`
instance as the SQL memory monitor and workers will becapped at double
the query limit. Results are monitored as before but a cap is not
introduced there since we didn't have one present previously.

This behavior is gated behind a private cluster setting that's enabled
by default and sets the ratio at 1/128 of the SQL memory pool.

Resolves #72986

Release note (ops change): customers running clusters with 240 nodes or
more can effectively access tsdb metrics.

75677: randgen: add PopulateRandTable r=mgartner a=msbutler

PopulateRandTable populates the caller's table with random data. This helper
function aims to make it easier for engineers to develop randomized tests that
leverage randgen / sqlsmith.

Informs #72345

Release note: None

76334: opt: fix missing filters after join reordering r=mgartner a=mgartner

#### opt: add TES, SES, and rules to reorderjoins

This commit updates the output of the `reorderjoins` opt test command to
display the initial state of the `JoinOrderBuilder`. It adds additional
information to the output including the TES, SES, and conflict rules for
each edge.

Release note: None

#### opt: fix missing filters after join reordering

This commit eliminates logic in the `assoc`, `leftAsscom`, and
`rightAsscom` functions in the join order builder that aimed to prevent
generating "orphaned" predicates, where one or more referenced relations
are not in a join's input. In rare cases, this logic had the side effect
of creating invalid conflict rules for edges, which could prevent valid
predicates from being added to reordered join trees.

It is safe to remove these conditionals because they are unnecessary.
The CD-C algorithm already prevents generation of orphaned predicates by
checking that the total eligibility set (TES) is a subset of a join's
input vertices. In our implementation, this is handled by the
`checkNonInnerJoin` and `checkInnerJoin` functions.

Fixes #76522

Release note (bug fix): A bug has been fixed which caused the query optimizer
to omit join filters in rare cases when reordering joins, which could
result in incorrect query results. This bug was present since v20.2.


Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: David Hartunian <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
5 people committed Feb 15, 2022
5 parents dd4fa26 + a098962 + 97150df + 6236b0a + 66d8865 commit 9132eac
Showing 32 changed files with 2,479 additions and 1,652 deletions.
15 changes: 15 additions & 0 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
@@ -147,6 +147,21 @@ percentage of physical memory (e.g. .25). If left unspecified, defaults to 25% o
physical memory.`,
}

TSDBMem = FlagInfo{
Name: "max-tsdb-memory",
Description: `
Maximum memory capacity available to store temporary data for use by the
time-series database to display metrics in the DB Console. Accepts numbers
interpreted as bytes, size suffixes (e.g. 1GB and 1GiB) or a
percentage of physical memory (e.g. 0.01). If left unspecified, defaults to
1% of physical memory or 64MiB whichever is greater. It maybe necessary to
manually increase this value on a cluster with hundreds of nodes where
individual nodes have very limited memory available. This can constrain
the ability of the DB Console to process time-series queries used to render
metrics for the entire cluster. This capacity constraint does not affect
SQL query execution.`,
}

SQLTempStorage = FlagInfo{
Name: "max-disk-temp-storage",
Description: `
11 changes: 11 additions & 0 deletions pkg/cli/context.go
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -128,6 +129,16 @@ func setServerContextDefaults() {
if bytes, _ := memoryPercentResolver(25); bytes != 0 {
serverCfg.SQLConfig.MemoryPoolSize = bytes
}

// Attempt to set serverCfg.TimeSeriesServerConfig.QueryMemoryMax to
// the default (64MiB) or 1% of system memory, whichever is greater.
if bytes, _ := memoryPercentResolver(1); bytes != 0 {
if bytes > ts.DefaultQueryMemoryMax {
serverCfg.TimeSeriesServerConfig.QueryMemoryMax = bytes
} else {
serverCfg.TimeSeriesServerConfig.QueryMemoryMax = ts.DefaultQueryMemoryMax
}
}
}

// baseCfg points to the base.Config inside serverCfg.
2 changes: 2 additions & 0 deletions pkg/cli/flags.go
Original file line number Diff line number Diff line change
@@ -526,6 +526,7 @@ func init() {
// Engine flags.
varFlag(f, cacheSizeValue, cliflags.Cache)
varFlag(f, sqlSizeValue, cliflags.SQLMem)
varFlag(f, tsdbSizeValue, cliflags.TSDBMem)
// N.B. diskTempStorageSizeValue.ResolvePercentage() will be called after
// the stores flag has been parsed and the storage device that a percentage
// refers to becomes known.
@@ -986,6 +987,7 @@ func init() {

// Engine flags.
varFlag(f, sqlSizeValue, cliflags.SQLMem)
varFlag(f, tsdbSizeValue, cliflags.TSDBMem)
// N.B. diskTempStorageSizeValue.ResolvePercentage() will be called after
// the stores flag has been parsed and the storage device that a percentage
// refers to becomes known.
88 changes: 49 additions & 39 deletions pkg/cli/flags_test.go
Original file line number Diff line number Diff line change
@@ -140,51 +140,61 @@ func TestClusterNameFlag(t *testing.T) {
}
}

func TestSQLMemoryPoolFlagValue(t *testing.T) {
func TestMemoryPoolFlagValues(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Avoid leaking configuration changes after the test ends.
defer initCLIDefaults()

f := startCmd.Flags()

// Check absolute values.
testCases := []struct {
value string
expected int64
for _, tc := range []struct {
flag string
config *int64
}{
{"100MB", 100 * 1000 * 1000},
{".5GiB", 512 * 1024 * 1024},
{"1.3", 1},
}
for _, c := range testCases {
args := []string{"--max-sql-memory", c.value}
if err := f.Parse(args); err != nil {
t.Fatal(err)
}
if c.expected != serverCfg.MemoryPoolSize {
t.Errorf("expected %d, but got %d", c.expected, serverCfg.MemoryPoolSize)
}
}
{flag: "--max-sql-memory", config: &serverCfg.MemoryPoolSize},
{flag: "--max-tsdb-memory", config: &serverCfg.TimeSeriesServerConfig.QueryMemoryMax},
} {
t.Run(tc.flag, func(t *testing.T) {
// Avoid leaking configuration changes after the test ends.
defer initCLIDefaults()

f := startCmd.Flags()

// Check absolute values.
testCases := []struct {
value string
expected int64
}{
{"100MB", 100 * 1000 * 1000},
{".5GiB", 512 * 1024 * 1024},
{"1.3", 1},
}
for _, c := range testCases {
args := []string{tc.flag, c.value}
if err := f.Parse(args); err != nil {
t.Fatal(err)
}
if c.expected != *tc.config {
t.Errorf("expected %d, but got %d", c.expected, tc.config)
}
}

for _, c := range []string{".30", "0.3"} {
args := []string{"--max-sql-memory", c}
if err := f.Parse(args); err != nil {
t.Fatal(err)
}
for _, c := range []string{".30", "0.3"} {
args := []string{tc.flag, c}
if err := f.Parse(args); err != nil {
t.Fatal(err)
}

// Check fractional values.
maxMem, err := status.GetTotalMemory(context.Background())
if err != nil {
t.Logf("total memory unknown: %v", err)
return
}
expectedLow := (maxMem * 28) / 100
expectedHigh := (maxMem * 32) / 100
if serverCfg.MemoryPoolSize < expectedLow || serverCfg.MemoryPoolSize > expectedHigh {
t.Errorf("expected %d-%d, but got %d", expectedLow, expectedHigh, serverCfg.MemoryPoolSize)
}
// Check fractional values.
maxMem, err := status.GetTotalMemory(context.Background())
if err != nil {
t.Logf("total memory unknown: %v", err)
return
}
expectedLow := (maxMem * 28) / 100
expectedHigh := (maxMem * 32) / 100
if *tc.config < expectedLow || *tc.config > expectedHigh {
t.Errorf("expected %d-%d, but got %d", expectedLow, expectedHigh, *tc.config)
}
}
})
}
}

7 changes: 4 additions & 3 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
@@ -185,6 +185,7 @@ func initTraceDir(ctx context.Context, dir string) {
var cacheSizeValue = newBytesOrPercentageValue(&serverCfg.CacheSize, memoryPercentResolver)
var sqlSizeValue = newBytesOrPercentageValue(&serverCfg.MemoryPoolSize, memoryPercentResolver)
var diskTempStorageSizeValue = newBytesOrPercentageValue(nil /* v */, nil /* percentResolver */)
var tsdbSizeValue = newBytesOrPercentageValue(&serverCfg.TimeSeriesServerConfig.QueryMemoryMax, memoryPercentResolver)

func initExternalIODir(ctx context.Context, firstStore base.StoreSpec) (string, error) {
externalIODir := startCtx.externalIODir
@@ -1101,12 +1102,12 @@ func maybeWarnMemorySizes(ctx context.Context) {

// Check that the total suggested "max" memory is well below the available memory.
if maxMemory, err := status.GetTotalMemory(ctx); err == nil {
requestedMem := serverCfg.CacheSize + serverCfg.MemoryPoolSize
requestedMem := serverCfg.CacheSize + serverCfg.MemoryPoolSize + serverCfg.TimeSeriesServerConfig.QueryMemoryMax
maxRecommendedMem := int64(.75 * float64(maxMemory))
if requestedMem > maxRecommendedMem {
log.Ops.Shoutf(ctx, severity.WARNING,
"the sum of --max-sql-memory (%s) and --cache (%s) is larger than 75%% of total RAM (%s).\nThis server is running at increased risk of memory-related failures.",
sqlSizeValue, cacheSizeValue, humanizeutil.IBytes(maxRecommendedMem))
"the sum of --max-sql-memory (%s), --cache (%s), and --max-tsdb-memory (%s) is larger than 75%% of total RAM (%s).\nThis server is running at increased risk of memory-related failures.",
sqlSizeValue, cacheSizeValue, tsdbSizeValue, humanizeutil.IBytes(maxRecommendedMem))
}
}
}
3 changes: 0 additions & 3 deletions pkg/internal/sqlsmith/setup.go
Original file line number Diff line number Diff line change
@@ -103,9 +103,6 @@ func randTablesN(r *rand.Rand, n int) string {
sb.WriteString(stmt.String())
sb.WriteString(";\n")
}

// TODO(mjibson): add random INSERTs.

return sb.String()
}

12 changes: 12 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
@@ -851,6 +851,18 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
// from recoverable internal errors, and is automatically committed
// otherwise. The retryable function should have no side effects which could
// cause problems in the event it must be run more than once.
// For example:
// err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// if kv, err := txn.Get(ctx, key); err != nil {
// return err
// }
// // ...
// return nil
// })
// Note that once the transaction encounters a retryable error, the txn object
// is marked as poisoned and all future ops fail fast until the retry. The
// callback may return either nil or the retryable error. Txn is responsible for
// resetting the transaction and retrying the callback.
func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error {
// TODO(radu): we should open a tracing Span here (we need to figure out how
// to use the correct tracer).
82 changes: 82 additions & 0 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ package kv_test
import (
"bytes"
"context"
"fmt"
"testing"
"time"

@@ -715,3 +716,84 @@ func TestGenerateForcedRetryableError(t *testing.T) {
require.True(t, errors.As(err, &retryErr))
require.Equal(t, 1, int(retryErr.Transaction.Epoch))
}

// Get a retryable error within a db.Txn transaction and verify the retry
// succeeds. We are verifying the behavior is the same whether the retryable
// callback returns the retryable error or returns nil. Both implementations are
// legal - returning early (with either nil or the error) after a retryable
// error is optional.
func TestDB_TxnRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())

testutils.RunTrueAndFalse(t, "returnNil", func(t *testing.T, returnNil bool) {
keyA := fmt.Sprintf("a_return_nil_%t", returnNil)
keyB := fmt.Sprintf("b_return_nil_%t", returnNil)
runNumber := 0
err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
require.NoError(t, txn.Put(ctx, keyA, "1"))
require.NoError(t, txn.Put(ctx, keyB, "1"))

{
// High priority txn - will abort the other txn.
hpTxn := kv.NewTxn(ctx, db, 0)
require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority))
// Only write if we have not written before, because otherwise we will keep aborting
// the other txn forever.
r, err := hpTxn.Get(ctx, keyA)
require.NoError(t, err)
if !r.Exists() {
require.Zero(t, runNumber)
require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn"))
require.NoError(t, hpTxn.Commit(ctx))
} else {
// We already wrote to keyA, meaning this is a retry, no need to write again.
require.Equal(t, 1, runNumber)
require.NoError(t, hpTxn.Rollback(ctx))
}
}

// Read, so that we'll get a retryable error.
r, err := txn.Get(ctx, keyA)
if runNumber == 0 {
// First run, we should get a retryable error.
require.Zero(t, runNumber)
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err)
require.Equal(t, []byte(nil), r.ValueBytes())

// At this point txn is poisoned, and any op returns the same (poisoning) error.
r, err = txn.Get(ctx, keyB)
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err)
require.Equal(t, []byte(nil), r.ValueBytes())
} else {
// The retry should succeed.
require.Equal(t, 1, runNumber)
require.NoError(t, err)
require.Equal(t, []byte("1"), r.ValueBytes())
}
runNumber++

if returnNil {
return nil
}
// Return the retryable error.
return err
})
require.NoError(t, err)
require.Equal(t, 2, runNumber)

err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
// The high priority txn was overwritten by the successful retry.
kv, e1 := txn.Get(ctx, keyA)
require.NoError(t, e1)
require.Equal(t, []byte("1"), kv.ValueBytes())
kv, e2 := txn.Get(ctx, keyB)
require.NoError(t, e2)
require.Equal(t, []byte("1"), kv.ValueBytes())
return nil
})
require.NoError(t, err1)
})
}
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvcoord/testdata/savepoints
Original file line number Diff line number Diff line change
@@ -486,6 +486,11 @@ savepoint x
abort
----
(*roachpb.TransactionRetryWithProtoRefreshError)
txn id not changed

reset
----
txn error cleared
txn id changed

release x
Loading

0 comments on commit 9132eac

Please sign in to comment.