From cd22662c67916e20d32ed0e857c83003ae9c23f0 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 30 Jun 2021 17:48:36 -0400 Subject: [PATCH 1/2] kv: remove spanset.GetDBEngine This was originally introduced to work around limitations in the `storage.Reader` interface, where only a `RocksDB` instance could be passed to `engine.ExportToSst`. Since then, a lot has changed, and this is no longer needed. Removing this is important, as it appears to undermine #55461 and make #66485 difficult. --- pkg/ccl/storageccl/export.go | 5 ++--- pkg/kv/kvserver/batcheval/cmd_add_sstable.go | 16 ++++---------- pkg/kv/kvserver/spanset/batch.go | 22 -------------------- 3 files changed, 6 insertions(+), 37 deletions(-) diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index 35169355c788..8955384f1bbf 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -69,7 +69,7 @@ func declareKeysExport( // evalExport dumps the requested keys into files of non-overlapping key ranges // in a format suitable for bulk ingest. func evalExport( - ctx context.Context, batch storage.Reader, cArgs batcheval.CommandArgs, resp roachpb.Response, + ctx context.Context, reader storage.Reader, cArgs batcheval.CommandArgs, resp roachpb.Response, ) (result.Result, error) { args := cArgs.Args.(*roachpb.ExportRequest) h := cArgs.Header @@ -147,7 +147,6 @@ func evalExport( return result.Result{}, errors.Errorf("unknown MVCC filter: %s", args.MVCCFilter) } - e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey}) targetSize := uint64(args.TargetFileSize) // TODO(adityamaru): Remove this once we are able to set tenant specific // cluster settings. This takes the minimum of the system tenant's cluster @@ -169,7 +168,7 @@ func evalExport( var curSizeOfExportedSSTs int64 for start := args.Key; start != nil; { destFile := &storage.MemFile{} - summary, resume, err := e.ExportMVCCToSst(start, args.EndKey, args.StartTime, + summary, resume, err := reader.ExportMVCCToSst(start, args.EndKey, args.StartTime, h.Timestamp, exportAllRevisions, targetSize, maxSize, useTBI, destFile) if err != nil { return result.Result{}, err diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index 0a0919d2256f..eeea6bdec055 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -219,27 +218,20 @@ func EvalAddSSTable( func checkForKeyCollisions( _ context.Context, - readWriter storage.ReadWriter, + reader storage.Reader, mvccStartKey storage.MVCCKey, mvccEndKey storage.MVCCKey, data []byte, ) (enginepb.MVCCStats, error) { - // We could get a spansetBatch so fetch the underlying db engine as - // we need access to the underlying C.DBIterator later, and the - // dbIteratorGetter is not implemented by a spansetBatch. - dbEngine := spanset.GetDBEngine(readWriter, roachpb.Span{Key: mvccStartKey.Key, EndKey: mvccEndKey.Key}) - - emptyMVCCStats := enginepb.MVCCStats{} - // Create iterator over the existing data. - existingDataIter := dbEngine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: mvccEndKey.Key}) + existingDataIter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: mvccEndKey.Key}) defer existingDataIter.Close() existingDataIter.SeekGE(mvccStartKey) if ok, err := existingDataIter.Valid(); err != nil { - return emptyMVCCStats, errors.Wrap(err, "checking for key collisions") + return enginepb.MVCCStats{}, errors.Wrap(err, "checking for key collisions") } else if !ok { // Target key range is empty, so it is safe to ingest. - return emptyMVCCStats, nil + return enginepb.MVCCStats{}, nil } return existingDataIter.CheckForKeyCollisions(data, mvccStartKey.Key, mvccEndKey.Key) diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index 2ae3a19634d1..23b3c6a3e9f5 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -489,28 +489,6 @@ func (s spanSetReader) PinEngineStateForIterators() error { return s.r.PinEngineStateForIterators() } -// GetDBEngine recursively searches for the underlying rocksDB engine. -func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader { - switch v := reader.(type) { - case ReadWriter: - return GetDBEngine(getSpanReader(v, span), span) - case *spanSetBatch: - return GetDBEngine(getSpanReader(v.ReadWriter, span), span) - default: - return reader - } -} - -// getSpanReader is a getter to access the storage.Reader field of the -// spansetReader. -func getSpanReader(r ReadWriter, span roachpb.Span) storage.Reader { - if err := r.spanSetReader.spans.CheckAllowed(SpanReadOnly, span); err != nil { - panic("Not in the span") - } - - return r.spanSetReader.r -} - type spanSetWriter struct { w storage.Writer spans *SpanSet From 65b0954910c7c4c9f8911a6b3c7fda0413ab13ba Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Fri, 25 Jun 2021 18:55:38 -0400 Subject: [PATCH 2/2] kvcoord: assert sanity when tracking in-flight write Release note: None --- pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index cd3ffb283872..7be7b9ae9b17 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -578,6 +578,12 @@ func (tp *txnPipeliner) updateLockTracking( // need to prove that these succeeded sometime before we commit. header := req.Header() tp.ifWrites.insert(header.Key, header.Sequence) + // The request is not expected to be a ranged one, as we're only + // tracking one key in the ifWrites. Ranged requests do not admit + // ba.AsyncConsensus. + if roachpb.IsRange(req) { + log.Fatalf(ctx, "unexpected range request with AsyncConsensus: %s", req) + } } else { // If the lock acquisitions weren't performed asynchronously // then add them directly to our lock footprint. Locking read