Skip to content

Commit

Permalink
Merge #92283 #92301
Browse files Browse the repository at this point in the history
92283: roachtest: remove leftover comment r=andreimatei a=andreimatei

This comment used to be associated with a minVersion: "v19.2.0" setting, which dissapeared in the meantime.

Release note: None
Epic: None

92301: row: clean up fetchers r=yuzefovich a=yuzefovich

This commit performs a bunch of miscellaneous cleanups around the different fetcher objects in the `row` package. The following changes are made:
- `SpanKVFetcher` has been renamed into `KVProvider` to better indicate its purpose (of supplying a given set of KVs to the fetcher machinery).
- relatedly, `StartScanFrom` method has been renamed to `ConsumeKVProvider` since the former is only used with the `KVProvider` as an argument (with the corresponding change to the signature too).
- we no longer track the "number of batch requests issued" when using the `KVProvider` since there aren't any and the callers don't actually need this info. Still, to be safe this commit adds an additional check to `row.Fetcher.GetBatchRequestsIssued`.
- `singleKVFetcher` previously used for decoding a single KV for error messages has been removed and its usage replaced with the `KVProvider`.
- `BackupSSTKVFetcher` is deleted entirely. This hasn't been used as of 3caed9e.
- `StartScanFrom` method has been removed from `rowFetcher` interface since the processors don't need it.
- `rowFetcherStatCollector` now directly embeds `row.Fetcher` allowing it to just "inherit" different methods that don't require any instrumentation.

Epic: None

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Nov 22, 2022
3 parents ad04e9d + 4d365b5 + 4ca18ef commit 1ff09bf
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 248 deletions.
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ type eventDecoder struct {
// Cached allocations for *row.Fetcher
rfCache *rowFetcherCache

// kvFetcher used to decode datums.
kvFetcher row.SpanKVFetcher
// kvProvider used to feed KVs into fetcher to decode them into datums.
kvProvider row.KVProvider

// factory for constructing event descriptors.
getEventDescriptor eventDescriptorFactory
Expand Down Expand Up @@ -451,9 +451,9 @@ func (d *eventDecoder) DecodeKV(
return Row{}, err
}

d.kvFetcher.KVs = d.kvFetcher.KVs[:0]
d.kvFetcher.KVs = append(d.kvFetcher.KVs, kv)
if err := d.fetcher.StartScanFrom(ctx, &d.kvFetcher); err != nil {
d.kvProvider.KVs = d.kvProvider.KVs[:0]
d.kvProvider.KVs = append(d.kvProvider.KVs, kv)
if err := d.fetcher.ConsumeKVProvider(ctx, &d.kvProvider); err != nil {
return Row{}, err
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (
"github.com/cockroachdb/errors"
)

// rowFetcherCache maintains a cache of single table RowFetchers. Given a key
// rowFetcherCache maintains a cache of single table row.Fetchers. Given a key
// with an mvcc timestamp, it retrieves the correct TableDescriptor for that key
// and returns a Fetcher initialized with that table. This Fetcher's
// StartScanFrom can be used to turn that key (or all the keys making up the
// column families of one row) into a row.
// and returns a row.Fetcher initialized with that table. This Fetcher's
// ConsumeKVProvider() can be used to turn that key (or all the keys making up
// the column families of one row) into a row.
type rowFetcherCache struct {
codec keys.SQLCodec
leaseMgr *lease.Manager
Expand Down Expand Up @@ -237,9 +237,9 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily(
if err := rf.Init(
context.TODO(),
row.FetcherInitArgs{
WillUseCustomKVBatchFetcher: true,
Alloc: &c.a,
Spec: &spec,
WillUseKVProvider: true,
Alloc: &c.a,
Spec: &spec,
},
); err != nil {
return nil, nil, err
Expand Down
9 changes: 2 additions & 7 deletions pkg/cmd/roachtest/tests/acceptance.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,8 @@ func registerAcceptance(r registry.Registry) {
},
registry.OwnerTestEng: {
{
name: "version-upgrade",
fn: runVersionUpgrade,
// This test doesn't like running on old versions because it upgrades to
// the latest released version and then it tries to "head", where head is
// the cockroach binary built from the branch on which the test is
// running. If that branch corresponds to an older release, then upgrading
// to head after 19.2 fails.
name: "version-upgrade",
fn: runVersionUpgrade,
timeout: 30 * time.Minute,
},
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func (d *deleteRangeNode) startExec(params runParams) error {
if err := d.fetcher.Init(
params.ctx,
row.FetcherInitArgs{
WillUseCustomKVBatchFetcher: true,
Alloc: &tree.DatumAlloc{},
Spec: &spec,
WillUseKVProvider: true,
Alloc: &tree.DatumAlloc{},
Spec: &spec,
},
); err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/row/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ go_library(
"//pkg/sql/span",
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/admission",
Expand Down
43 changes: 7 additions & 36 deletions pkg/sql/row/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// singleKVFetcher is a KVBatchFetcher that returns a single kv.
type singleKVFetcher struct {
kvs [1]roachpb.KeyValue
done bool
}

var _ KVBatchFetcher = &singleKVFetcher{}

// nextBatch implements the KVBatchFetcher interface.
func (f *singleKVFetcher) nextBatch(ctx context.Context) (kvBatchFetcherResponse, error) {
if f.done {
return kvBatchFetcherResponse{moreKVs: false}, nil
}
f.done = true
return kvBatchFetcherResponse{
moreKVs: true,
kvs: f.kvs[:],
}, nil
}

// ConvertBatchError attempts to map a key-value error generated during a
// key-value batch operating over the specified table to a user friendly SQL
// error.
Expand Down Expand Up @@ -281,20 +260,20 @@ func DecodeRowInfo(
if err := rf.Init(
ctx,
FetcherInitArgs{
WillUseCustomKVBatchFetcher: true,
Alloc: &tree.DatumAlloc{},
Spec: &spec,
WillUseKVProvider: true,
Alloc: &tree.DatumAlloc{},
Spec: &spec,
},
); err != nil {
return nil, nil, nil, err
}
f := singleKVFetcher{kvs: [1]roachpb.KeyValue{{Key: key}}}
f := KVProvider{KVs: []roachpb.KeyValue{{Key: key}}}
if value != nil {
f.kvs[0].Value = *value
f.KVs[0].Value = *value
}
// Use the Fetcher to decode the single kv pair above by passing in
// this singleKVFetcher implementation, which doesn't actually hit KV.
if err := rf.StartScanFrom(ctx, &f); err != nil {
// this KVProvider implementation, which doesn't actually hit KV.
if err := rf.ConsumeKVProvider(ctx, &f); err != nil {
return nil, nil, nil, err
}
datums, err := rf.NextRowDecoded(ctx)
Expand All @@ -317,11 +296,3 @@ func DecodeRowInfo(
}
return index, names, values, nil
}

func (f *singleKVFetcher) SetupNextFetch(
context.Context, roachpb.Spans, []int, rowinfra.BytesLimit, rowinfra.KeyLimit,
) error {
return nil
}

func (f *singleKVFetcher) close(context.Context) {}
59 changes: 30 additions & 29 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type tableInfo struct {
// meaningful when kv deletion tombstones are returned by the KVBatchFetcher,
// which the one used by `StartScan` (the common case) doesnt. Notably,
// changefeeds use this by providing raw kvs with tombstones unfiltered via
// `StartScanFrom`.
// `ConsumeKVProvider`.
rowIsDeleted bool
}

Expand Down Expand Up @@ -224,12 +224,13 @@ type FetcherInitArgs struct {
// kvstreamer.Streamer API under the hood. The caller is then expected to
// use only StartScan() method.
StreamingKVFetcher *KVFetcher
// WillUseCustomKVBatchFetcher, if true, indicates that the caller will only
// use StartScanFrom() method and will be providing its own KVBatchFetcher.
WillUseCustomKVBatchFetcher bool
// WillUseKVProvider, if true, indicates that the caller will only use
// ConsumeKVProvider() method and will use its own KVBatchFetcher
// implementation.
WillUseKVProvider bool
// Txn is the txn for the fetch. It might be nil, and the caller is expected
// to either provide the txn later via SetTxn() or to only use StartScanFrom
// method.
// to either provide the txn later via SetTxn() or to only use
// ConsumeKVProvider method.
Txn *kv.Txn
// Reverse denotes whether or not the spans should be read in reverse or not
// when StartScan* methods are invoked.
Expand Down Expand Up @@ -368,13 +369,13 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error {
}

if args.StreamingKVFetcher != nil {
if args.WillUseCustomKVBatchFetcher {
if args.WillUseKVProvider {
return errors.AssertionFailedf(
"StreamingKVFetcher is non-nil when WillUseCustomKVBatchFetcher is true",
"StreamingKVFetcher is non-nil when WillUseKVProvider is true",
)
}
rf.kvFetcher = args.StreamingKVFetcher
} else if !args.WillUseCustomKVBatchFetcher {
} else if !args.WillUseKVProvider {
fetcherArgs := kvBatchFetcherArgs{
reverse: args.Reverse,
lockStrength: args.LockStrength,
Expand Down Expand Up @@ -423,8 +424,7 @@ func (rf *Fetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) error {
}

// StartScan initializes and starts the key-value scan. Can be used multiple
// times. Cannot be used if WillUseCustomKVBatchFetcher was set to true in
// Init().
// times. Cannot be used if WillUseKVProvider was set to true in Init().
//
// The fetcher takes ownership of the spans slice - it can modify the slice and
// will perform the memory accounting accordingly (if Init() was called with
Expand Down Expand Up @@ -467,8 +467,8 @@ func (rf *Fetcher) StartScan(
batchBytesLimit rowinfra.BytesLimit,
rowLimitHint rowinfra.RowLimit,
) error {
if rf.args.WillUseCustomKVBatchFetcher {
return errors.AssertionFailedf("StartScan is called instead of StartScanFrom")
if rf.args.WillUseKVProvider {
return errors.AssertionFailedf("StartScan is called instead of ConsumeKVProvider")
}
if len(spans) == 0 {
return errors.AssertionFailedf("no spans")
Expand Down Expand Up @@ -498,8 +498,8 @@ var TestingInconsistentScanSleep time.Duration
// that has passed. See the documentation for TableReaderSpec for more
// details.
//
// Can be used multiple times. Cannot be used if WillUseCustomKVBatchFetcher was
// set to true in Init().
// Can be used multiple times. Cannot be used if WillUseKVProvider was set to
// true in Init().
//
// Batch limits can only be used if the spans are ordered.
func (rf *Fetcher) StartInconsistentScan(
Expand All @@ -515,8 +515,8 @@ func (rf *Fetcher) StartInconsistentScan(
if rf.args.StreamingKVFetcher != nil {
return errors.AssertionFailedf("StartInconsistentScan is called instead of StartScan")
}
if rf.args.WillUseCustomKVBatchFetcher {
return errors.AssertionFailedf("StartInconsistentScan is called instead of StartScanFrom")
if rf.args.WillUseKVProvider {
return errors.AssertionFailedf("StartInconsistentScan is called instead of ConsumeKVProvider")
}
if len(spans) == 0 {
return errors.AssertionFailedf("no spans")
Expand Down Expand Up @@ -598,22 +598,20 @@ func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint rowinfra.RowLimit) rowinfra.K
return rowinfra.KeyLimit(int64(rowLimitHint)*int64(rf.table.spec.MaxKeysPerRow) + 1)
}

// StartScanFrom initializes and starts a scan from the given KVBatchFetcher.
// Can be used multiple times. Cannot be used if WillUseCustomKVBatchFetcher was
// set to false in Init().
func (rf *Fetcher) StartScanFrom(ctx context.Context, f KVBatchFetcher) error {
if !rf.args.WillUseCustomKVBatchFetcher {
return errors.AssertionFailedf("StartScanFrom is called instead of StartScan")
// ConsumeKVProvider initializes and starts a "scan" of the given KVProvider.
// Can be used multiple times. Cannot be used if WillUseKVProvider was set to
// false in Init().
func (rf *Fetcher) ConsumeKVProvider(ctx context.Context, f *KVProvider) error {
if !rf.args.WillUseKVProvider {
return errors.AssertionFailedf("ConsumeKVProvider is called instead of StartScan")
}
var batchRequestsIssued *int64
if rf.kvFetcher != nil {
rf.kvFetcher.Close(ctx)
// Keep the same counter across different fetchers.
batchRequestsIssued = rf.kvFetcher.atomics.batchRequestsIssued
} else {
batchRequestsIssued = new(int64)
}
rf.kvFetcher = newKVFetcher(f, batchRequestsIssued)
// We won't actually perform any KV reads, so we don't need to track the
// number of batch requests issued - the case of the KVProvider is handled
// separately in GetBatchRequestsIssued().
rf.kvFetcher = newKVFetcher(f, nil /* batchRequestsIssued */)
return rf.startScan(ctx)
}

Expand Down Expand Up @@ -1266,5 +1264,8 @@ func (rf *Fetcher) GetBytesRead() int64 {
// GetBatchRequestsIssued returns total number of BatchRequests issued by the
// underlying KVFetcher.
func (rf *Fetcher) GetBatchRequestsIssued() int64 {
if rf == nil || rf.args.WillUseKVProvider {
return 0
}
return rf.kvFetcher.GetBatchRequestsIssued()
}
8 changes: 4 additions & 4 deletions pkg/sql/row/fetcher_mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ func TestRowFetcherMVCCMetadata(t *testing.T) {
if err := rf.Init(
ctx,
row.FetcherInitArgs{
WillUseCustomKVBatchFetcher: true,
Alloc: &tree.DatumAlloc{},
Spec: &spec,
WillUseKVProvider: true,
Alloc: &tree.DatumAlloc{},
Spec: &spec,
},
); err != nil {
t.Fatal(err)
Expand All @@ -115,7 +115,7 @@ func TestRowFetcherMVCCMetadata(t *testing.T) {
log.Infof(ctx, "%v %v %v", kv.Key, kv.Value.Timestamp, kv.Value.PrettyPrint())
}

if err := rf.StartScanFrom(ctx, &row.SpanKVFetcher{KVs: kvs}); err != nil {
if err := rf.ConsumeKVProvider(ctx, &row.KVProvider{KVs: kvs}); err != nil {
t.Fatal(err)
}
var rows []rowWithMVCCMetadata
Expand Down
Loading

0 comments on commit 1ff09bf

Please sign in to comment.