Skip to content

Commit

Permalink
row: more Fetcher cleanup
Browse files Browse the repository at this point in the history
 - improve comment for `indexKey`;
 - unexport NextKey;
 - slightly change the return value of nextKey to simplify the logic
   (the semantic difference is what the first call returns, which is
   not used);
 - use numKeysPerRow instead of counting the total families; this
   enables the faster paths for more cases.

Release note: None
  • Loading branch information
RaduBerinde committed Jan 21, 2022
1 parent e659615 commit a6f5016
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 51 deletions.
69 changes: 25 additions & 44 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ type Fetcher struct {

// -- Fields updated during a scan --

kvFetcher *KVFetcher
indexKey []byte // the index key of the current row
kvFetcher *KVFetcher
// indexKey stores the index key of the current row, up to (and not including)
// any family ID.
indexKey []byte
prettyValueBuf *bytes.Buffer

valueColsFound int // how many needed cols we've found so far in the value
Expand Down Expand Up @@ -590,8 +592,9 @@ func (rf *Fetcher) StartScanFrom(ctx context.Context, f KVBatchFetcher, traceKV
rf.kvFetcher.Close(ctx)
}
rf.kvFetcher = newKVFetcher(f)
rf.kvEnd = false
// Retrieve the first key.
_, err := rf.NextKey(ctx)
_, err := rf.nextKey(ctx)
return err
}

Expand All @@ -618,37 +621,33 @@ func (rf *Fetcher) setNextKV(kv roachpb.KeyValue, needsCopy bool) {
rf.kv = kvCopy
}

// NextKey retrieves the next key/value and sets kv/kvEnd. Returns whether a row
// has been completed.
func (rf *Fetcher) NextKey(ctx context.Context) (rowDone bool, _ error) {
moreKVs, kv, finalReferenceToBatch, err := rf.kvFetcher.NextKV(ctx, rf.mvccDecodeStrategy)
// nextKey retrieves the next key/value and sets kv/kvEnd. Returns whether the
// key indicates a new row (as opposed to another family for the current row).
func (rf *Fetcher) nextKey(ctx context.Context) (newRow bool, _ error) {
ok, kv, finalReferenceToBatch, err := rf.kvFetcher.NextKV(ctx, rf.mvccDecodeStrategy)
if err != nil {
return false, ConvertFetchError(ctx, rf, err)
}
rf.setNextKV(kv, finalReferenceToBatch)

rf.kvEnd = !moreKVs
if rf.kvEnd {
if !ok {
// No more keys in the scan.
//
// NB: this assumes that the KV layer will never split a range
// between column families, which is a brittle assumption.
// See:
// https://github.com/cockroachdb/cockroach/pull/42056
rf.kvEnd = true
return true, nil
}

// unchangedPrefix will be set to true if we can skip decoding the index key
// completely, because the last key we saw has identical prefix to the
// current key.
//
// See Init() for a detailed description of when we can get away with not
// reading the index key.
unchangedPrefix := rf.indexKey != nil && bytes.HasPrefix(rf.kv.Key, rf.indexKey)
// unchangedPrefix will be set to true if the current KV belongs to the same
// row as the previous KV (i.e. the last and current keys have identical
// prefix). In this case, we can skip decoding the index key completely.
unchangedPrefix := rf.numKeysPerRow > 1 && rf.indexKey != nil && bytes.HasPrefix(rf.kv.Key, rf.indexKey)
if unchangedPrefix {
// Skip decoding!
rf.keyRemainingBytes = rf.kv.Key[len(rf.indexKey):]
} else if rf.mustDecodeIndexKey {
return false, nil
}

// The current key belongs to a new row.
if rf.mustDecodeIndexKey {
var foundNull bool
rf.keyRemainingBytes, foundNull, err = rf.DecodeIndexKey(rf.kv.Key)
if err != nil {
Expand All @@ -671,7 +670,7 @@ func (rf *Fetcher) NextKey(ctx context.Context) (rowDone bool, _ error) {
// them when processing the index. The difference with unique secondary indexes
// is that the extra columns are not always there, and are used to unique-ify
// the index key, rather than provide the primary key column values.
if foundNull && rf.table.isSecondaryIndex && rf.table.index.IsUnique() && len(rf.table.desc.GetFamilies()) != 1 {
if foundNull && rf.table.isSecondaryIndex && rf.table.index.IsUnique() && rf.numKeysPerRow > 1 {
for i := 0; i < rf.table.index.NumKeySuffixColumns(); i++ {
var err error
// Slice off an extra encoded column from rf.keyRemainingBytes.
Expand All @@ -693,26 +692,8 @@ func (rf *Fetcher) NextKey(ctx context.Context) (rowDone bool, _ error) {
rf.keyRemainingBytes = rf.kv.Key[prefixLen:]
}

switch {
case len(rf.table.desc.GetFamilies()) == 1:
// If we only have one family, we know that there is only 1 k/v pair per row.
rowDone = true
case !unchangedPrefix:
// If the prefix of the key has changed, current key is from a different
// row than the previous one.
rowDone = true
default:
rowDone = false
}

if rf.indexKey != nil && rowDone {
// The current key belongs to a new row. Output the
// current row.
rf.indexKey = nil
return true, nil
}

return false, nil
rf.indexKey = nil
return true, nil
}

func (rf *Fetcher) prettyEncDatums(types []*types.T, vals []rowenc.EncDatum) string {
Expand Down Expand Up @@ -1079,7 +1060,7 @@ func (rf *Fetcher) NextRow(ctx context.Context) (row rowenc.EncDatumRow, err err
log.VEventf(ctx, 2, "fetched: %s -> %s", prettyKey, prettyVal)
}

rowDone, err := rf.NextKey(ctx)
rowDone, err := rf.nextKey(ctx)
if err != nil {
return nil, err
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ const (
// unexpectedly.
func (f *KVFetcher) NextKV(
ctx context.Context, mvccDecodeStrategy MVCCDecodingStrategy,
) (moreKVs bool, kv roachpb.KeyValue, finalReferenceToBatch bool, err error) {
) (ok bool, kv roachpb.KeyValue, finalReferenceToBatch bool, err error) {
for {
// Only one of f.kvs or f.batchResponse will be set at a given time. Which
// one is set depends on the format returned by a given BatchRequest.
Expand Down Expand Up @@ -206,12 +206,9 @@ func (f *KVFetcher) NextKV(
}, lastKey, nil
}

moreKVs, f.kvs, f.batchResponse, err = f.nextBatch(ctx)
if err != nil {
return moreKVs, kv, false, err
}
if !moreKVs {
return false, kv, false, nil
ok, f.kvs, f.batchResponse, err = f.nextBatch(ctx)
if err != nil || !ok {
return ok, kv, false, err
}
f.newSpan = true
nBytes := len(f.batchResponse)
Expand Down

0 comments on commit a6f5016

Please sign in to comment.