Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
75261: row: fetcher cleanup and improvements r=RaduBerinde a=RaduBerinde

#### rowenc: remove deprecated return from DecodeIndexKey

Release note: None

#### row: minor cleanup around foundNull

This moves around some code to make it clear that it's only relevant in
a specific case.

Release note: None

#### row: clean up ReadIndexKey

Renaming to DecodeIndexKey and removing return value which is no
longer useful.

Release note: None

#### row: more Fetcher cleanup

 - improve comment for `indexKey`;
 - unexport NextKey;
 - slightly change the return value of nextKey to simplify the logic
   (the semantic difference is what the first call returns, which is
   not used);
 - use numKeysPerRow instead of counting the total families; this
   enables the faster paths for more cases.

Release note: None


75272: kvserver: de-flake TestStoreSplitRangeLookupRace r=irfansharif a=irfansharif

Fixes #75198. This test was a bit brittle in expecting only one kind of
range lookup request in a testing filter -- it was always possible to
intercept a ReverseScanRequest, and after enabling span configs
(#73876), we now have an internal query ("validate-span-cfgs") that
makes use of it. See #75198 for more details.

Release note: None

75281: sql: disable span-config on flakey 5node tests r=cucaroach a=cucaroach

Temporary fix for #72802 and 5node/distsql_enum CI failures.

Due to #73876 these tests have become flakey.
With the disable-span-configs option 70 runs of make stress on the opt logic tests pass.

Release note: None


Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Tommy Reilly <[email protected]>
  • Loading branch information
4 people committed Jan 21, 2022
4 parents 1f819bb + a6f5016 + 38a65a9 + 422e00e commit fdfd893
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 139 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3018,7 +3018,7 @@ func TestStoreSplitRangeLookupRace(t *testing.T) {
select {
case <-blockRangeLookups:
if kv.TestingIsRangeLookup(ba) &&
ba.Requests[0].GetInner().(*roachpb.ScanRequest).Key.Equal(bounds.Key.AsRawKey()) {
ba.Requests[0].GetInner().Header().Key.Equal(bounds.Key.AsRawKey()) {

select {
case rangeLookupIsBlocked <- struct{}{}:
Expand Down
5 changes: 1 addition & 4 deletions pkg/server/settingswatcher/row_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,10 @@ func (d *RowDecoder) DecodeRow(
{
types := []*types.T{d.columns[0].GetType()}
nameRow := make([]rowenc.EncDatum, 1)
_, matches, _, err := rowenc.DecodeIndexKey(d.codec, types, nameRow, nil, kv.Key)
_, _, err := rowenc.DecodeIndexKey(d.codec, types, nameRow, nil, kv.Key)
if err != nil {
return "", RawValue{}, false, errors.Wrap(err, "failed to decode key")
}
if !matches {
return "", RawValue{}, false, errors.Errorf("unexpected non-settings KV with settings prefix: %v", kv.Key)
}
if err := nameRow[0].EnsureDecoded(types[0], &d.alloc); err != nil {
return "", RawValue{}, false, err
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/spanconfig/spanconfigkvsubscriber/span_config_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,10 @@ func (sd *spanConfigDecoder) decode(kv roachpb.KeyValue) (entry roachpb.SpanConf
{
types := []*types.T{sd.columns[0].GetType()}
startKeyRow := make([]rowenc.EncDatum, 1)
_, matches, _, err := rowenc.DecodeIndexKey(keys.SystemSQLCodec, types, startKeyRow, nil /* colDirs */, kv.Key)
_, _, err := rowenc.DecodeIndexKey(keys.SystemSQLCodec, types, startKeyRow, nil /* colDirs */, kv.Key)
if err != nil {
return roachpb.SpanConfigEntry{}, errors.Wrapf(err, "failed to decode key: %v", kv.Key)
}
if !matches {
return roachpb.SpanConfigEntry{},
errors.AssertionFailedf(
"system.span_configurations descriptor does not match key: %v", kv.Key,
)
}
if err := startKeyRow[0].EnsureDecoded(types[0], &sd.alloc); err != nil {
return roachpb.SpanConfigEntry{}, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ func (zd *zonesDecoder) DecodePrimaryKey(key roachpb.Key) (descpb.ID, error) {
tbl := systemschema.ZonesTable
types := []*types.T{tbl.PublicColumns()[0].GetType()}
startKeyRow := make([]rowenc.EncDatum, 1)
_, matches, _, err := rowenc.DecodeIndexKey(
_, _, err := rowenc.DecodeIndexKey(
zd.codec, types, startKeyRow, nil /* colDirs */, key,
)
if err != nil || !matches {
return descpb.InvalidID, errors.AssertionFailedf("failed to decode key in system.zones %v", key)
if err != nil {
return descpb.InvalidID, errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode key in system.zones %v", key)
}
if err := startKeyRow[0].EnsureDecoded(types[0], &zd.alloc); err != nil {
return descpb.InvalidID, errors.AssertionFailedf("failed to decode key in system.zones %v", key)
return descpb.InvalidID, errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode key in system.zones %v", key)
}
descID := descpb.ID(tree.MustBeDInt(startKeyRow[0].Datum))
return descID, nil
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,10 @@ func (d *deleteRangeNode) processResults(
continue
}

after, ok, _, err := d.fetcher.ReadIndexKey(keyBytes)
after, _, err := d.fetcher.DecodeIndexKey(keyBytes)
if err != nil {
return nil, err
}
if !ok {
return nil, errors.AssertionFailedf("key did not match descriptor")
}
k := keyBytes[:len(keyBytes)-len(after)]
if !bytes.Equal(k, prev) {
prev = k
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/distsql_enum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# LogicTest: 5node-default-configs !5node-metadata
# cluster-opt: disable-span-configs

# Regression test for nested tuple enum hydration (#74189)
statement ok
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# LogicTest: 5node
# cluster-opt: disable-span-configs

statement ok
CREATE TABLE geo_table(
Expand Down
149 changes: 57 additions & 92 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ type Fetcher struct {

// -- Fields updated during a scan --

kvFetcher *KVFetcher
indexKey []byte // the index key of the current row
kvFetcher *KVFetcher
// indexKey stores the index key of the current row, up to (and not including)
// any family ID.
indexKey []byte
prettyValueBuf *bytes.Buffer

valueColsFound int // how many needed cols we've found so far in the value
Expand Down Expand Up @@ -590,8 +592,9 @@ func (rf *Fetcher) StartScanFrom(ctx context.Context, f KVBatchFetcher, traceKV
rf.kvFetcher.Close(ctx)
}
rf.kvFetcher = newKVFetcher(f)
rf.kvEnd = false
// Retrieve the first key.
_, err := rf.NextKey(ctx)
_, err := rf.nextKey(ctx)
return err
}

Expand All @@ -618,48 +621,64 @@ func (rf *Fetcher) setNextKV(kv roachpb.KeyValue, needsCopy bool) {
rf.kv = kvCopy
}

// NextKey retrieves the next key/value and sets kv/kvEnd. Returns whether a row
// has been completed.
func (rf *Fetcher) NextKey(ctx context.Context) (rowDone bool, _ error) {
moreKVs, kv, finalReferenceToBatch, err := rf.kvFetcher.NextKV(ctx, rf.mvccDecodeStrategy)
// nextKey retrieves the next key/value and sets kv/kvEnd. Returns whether the
// key indicates a new row (as opposed to another family for the current row).
func (rf *Fetcher) nextKey(ctx context.Context) (newRow bool, _ error) {
ok, kv, finalReferenceToBatch, err := rf.kvFetcher.NextKV(ctx, rf.mvccDecodeStrategy)
if err != nil {
return false, ConvertFetchError(ctx, rf, err)
}
rf.setNextKV(kv, finalReferenceToBatch)

rf.kvEnd = !moreKVs
if rf.kvEnd {
if !ok {
// No more keys in the scan.
//
// NB: this assumes that the KV layer will never split a range
// between column families, which is a brittle assumption.
// See:
// https://github.com/cockroachdb/cockroach/pull/42056
rf.kvEnd = true
return true, nil
}

// foundNull is set when decoding a new index key for a row finds a NULL value
// in the index key. This is used when decoding unique secondary indexes in order
// to tell whether they have extra columns appended to the key.
var foundNull bool

// unchangedPrefix will be set to true if we can skip decoding the index key
// completely, because the last key we saw has identical prefix to the
// current key.
//
// See Init() for a detailed description of when we can get away with not
// reading the index key.
unchangedPrefix := rf.indexKey != nil && bytes.HasPrefix(rf.kv.Key, rf.indexKey)
// unchangedPrefix will be set to true if the current KV belongs to the same
// row as the previous KV (i.e. the last and current keys have identical
// prefix). In this case, we can skip decoding the index key completely.
unchangedPrefix := rf.numKeysPerRow > 1 && rf.indexKey != nil && bytes.HasPrefix(rf.kv.Key, rf.indexKey)
if unchangedPrefix {
// Skip decoding!
rf.keyRemainingBytes = rf.kv.Key[len(rf.indexKey):]
} else if rf.mustDecodeIndexKey {
rf.keyRemainingBytes, moreKVs, foundNull, err = rf.ReadIndexKey(rf.kv.Key)
return false, nil
}

// The current key belongs to a new row.
if rf.mustDecodeIndexKey {
var foundNull bool
rf.keyRemainingBytes, foundNull, err = rf.DecodeIndexKey(rf.kv.Key)
if err != nil {
return false, err
}
if !moreKVs {
return false, errors.AssertionFailedf("key did not match any of the table descriptors")
// For unique secondary indexes, the index-key does not distinguish one row
// from the next if both rows contain identical values along with a NULL.
// Consider the keys:
//
// /test/unique_idx/NULL/0
// /test/unique_idx/NULL/1
//
// The index-key extracted from the above keys is /test/unique_idx/NULL. The
// trailing /0 and /1 are the primary key used to unique-ify the keys when a
// NULL is present. When a null is present in the index key, we cut off more
// of the index key so that the prefix includes the primary key columns.
//
// Note that we do not need to do this for non-unique secondary indexes because
// the extra columns in the primary key will _always_ be there, so we can decode
// them when processing the index. The difference with unique secondary indexes
// is that the extra columns are not always there, and are used to unique-ify
// the index key, rather than provide the primary key column values.
if foundNull && rf.table.isSecondaryIndex && rf.table.index.IsUnique() && rf.numKeysPerRow > 1 {
for i := 0; i < rf.table.index.NumKeySuffixColumns(); i++ {
var err error
// Slice off an extra encoded column from rf.keyRemainingBytes.
rf.keyRemainingBytes, err = keyside.Skip(rf.keyRemainingBytes)
if err != nil {
return false, err
}
}
}
} else {
// We still need to consume the key until the family
Expand All @@ -673,54 +692,8 @@ func (rf *Fetcher) NextKey(ctx context.Context) (rowDone bool, _ error) {
rf.keyRemainingBytes = rf.kv.Key[prefixLen:]
}

// For unique secondary indexes, the index-key does not distinguish one row
// from the next if both rows contain identical values along with a NULL.
// Consider the keys:
//
// /test/unique_idx/NULL/0
// /test/unique_idx/NULL/1
//
// The index-key extracted from the above keys is /test/unique_idx/NULL. The
// trailing /0 and /1 are the primary key used to unique-ify the keys when a
// NULL is present. When a null is present in the index key, we cut off more
// of the index key so that the prefix includes the primary key columns.
//
// Note that we do not need to do this for non-unique secondary indexes because
// the extra columns in the primary key will _always_ be there, so we can decode
// them when processing the index. The difference with unique secondary indexes
// is that the extra columns are not always there, and are used to unique-ify
// the index key, rather than provide the primary key column values.
if foundNull && rf.table.isSecondaryIndex && rf.table.index.IsUnique() && len(rf.table.desc.GetFamilies()) != 1 {
for i := 0; i < rf.table.index.NumKeySuffixColumns(); i++ {
var err error
// Slice off an extra encoded column from rf.keyRemainingBytes.
rf.keyRemainingBytes, err = keyside.Skip(rf.keyRemainingBytes)
if err != nil {
return false, err
}
}
}

switch {
case len(rf.table.desc.GetFamilies()) == 1:
// If we only have one family, we know that there is only 1 k/v pair per row.
rowDone = true
case !unchangedPrefix:
// If the prefix of the key has changed, current key is from a different
// row than the previous one.
rowDone = true
default:
rowDone = false
}

if rf.indexKey != nil && rowDone {
// The current key belongs to a new row. Output the
// current row.
rf.indexKey = nil
return true, nil
}

return false, nil
rf.indexKey = nil
return true, nil
}

func (rf *Fetcher) prettyEncDatums(types []*types.T, vals []rowenc.EncDatum) string {
Expand All @@ -736,23 +709,15 @@ func (rf *Fetcher) prettyEncDatums(types []*types.T, vals []rowenc.EncDatum) str
return buf.String()
}

// ReadIndexKey decodes an index key for a given table.
// It returns whether or not the key is for any of the tables initialized
// in Fetcher, and the remaining part of the key if it is.
// ReadIndexKey additionally returns whether or not it encountered a null while decoding.
func (rf *Fetcher) ReadIndexKey(
key roachpb.Key,
) (remaining []byte, ok bool, foundNull bool, err error) {
remaining, foundNull, err = rowenc.DecodeKeyVals(
// DecodeIndexKey decodes an index key and returns the remaining key and whether
// it encountered a null while decoding.
func (rf *Fetcher) DecodeIndexKey(key roachpb.Key) (remaining []byte, foundNull bool, err error) {
return rowenc.DecodeKeyVals(
rf.table.keyValTypes,
rf.table.keyVals,
rf.table.indexColumnDirs,
key[rf.table.knownPrefixLength:],
)
if err != nil {
return nil, false, false, err
}
return remaining, true, foundNull, nil
}

// KeyToDesc implements the KeyToDescTranslator interface. The implementation is
Expand All @@ -761,7 +726,7 @@ func (rf *Fetcher) KeyToDesc(key roachpb.Key) (catalog.TableDescriptor, bool) {
if len(key) < rf.table.knownPrefixLength {
return nil, false
}
if _, ok, _, err := rf.ReadIndexKey(key); !ok || err != nil {
if _, _, err := rf.DecodeIndexKey(key); err != nil {
return nil, false
}
return rf.table.desc, true
Expand Down Expand Up @@ -1095,7 +1060,7 @@ func (rf *Fetcher) NextRow(ctx context.Context) (row rowenc.EncDatumRow, err err
log.VEventf(ctx, 2, "fetched: %s -> %s", prettyKey, prettyVal)
}

rowDone, err := rf.NextKey(ctx)
rowDone, err := rf.nextKey(ctx)
if err != nil {
return nil, err
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ const (
// unexpectedly.
func (f *KVFetcher) NextKV(
ctx context.Context, mvccDecodeStrategy MVCCDecodingStrategy,
) (moreKVs bool, kv roachpb.KeyValue, finalReferenceToBatch bool, err error) {
) (ok bool, kv roachpb.KeyValue, finalReferenceToBatch bool, err error) {
for {
// Only one of f.kvs or f.batchResponse will be set at a given time. Which
// one is set depends on the format returned by a given BatchRequest.
Expand Down Expand Up @@ -206,12 +206,9 @@ func (f *KVFetcher) NextKV(
}, lastKey, nil
}

moreKVs, f.kvs, f.batchResponse, err = f.nextBatch(ctx)
if err != nil {
return moreKVs, kv, false, err
}
if !moreKVs {
return false, kv, false, nil
ok, f.kvs, f.batchResponse, err = f.nextBatch(ctx)
if err != nil || !ok {
return ok, kv, false, err
}
f.newSpan = true
nBytes := len(f.batchResponse)
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/rowenc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ go_test(
"//pkg/util/randutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
13 changes: 6 additions & 7 deletions pkg/sql/rowenc/index_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,28 +464,27 @@ func DecodeIndexKeyPrefix(
// The remaining bytes in the index key are returned which will either be an
// encoded column ID for the primary key index, the primary key suffix for
// non-unique secondary indexes or unique secondary indexes containing NULL or
// empty. If the given descriptor does not match the key, false is returned with
// no error.
// empty.
func DecodeIndexKey(
codec keys.SQLCodec,
types []*types.T,
vals []EncDatum,
colDirs []descpb.IndexDescriptor_Direction,
key []byte,
) (remainingKey []byte, matches bool, foundNull bool, _ error) {
) (remainingKey []byte, foundNull bool, _ error) {
key, err := codec.StripTenantPrefix(key)
if err != nil {
return nil, false, false, err
return nil, false, err
}
key, _, _, err = DecodePartialTableIDIndexID(key)
if err != nil {
return nil, false, false, err
return nil, false, err
}
remainingKey, foundNull, err = DecodeKeyVals(types, vals, colDirs, key)
if err != nil {
return nil, false, false, err
return nil, false, err
}
return remainingKey, true, foundNull, nil
return remainingKey, foundNull, nil
}

// DecodeKeyVals decodes the values that are part of the key. The decoded
Expand Down
7 changes: 1 addition & 6 deletions pkg/sql/rowenc/index_encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -93,13 +92,9 @@ func decodeIndex(
}
values := make([]EncDatum, index.NumKeyColumns())
colDirs := index.IndexDesc().KeyColumnDirections
_, ok, _, err := DecodeIndexKey(codec, types, values, colDirs, key)
if err != nil {
if _, _, err := DecodeIndexKey(codec, types, values, colDirs, key); err != nil {
return nil, err
}
if !ok {
return nil, errors.Errorf("key did not match descriptor")
}

decodedValues := make([]tree.Datum, len(values))
var da tree.DatumAlloc
Expand Down
Loading

0 comments on commit fdfd893

Please sign in to comment.