Skip to content

Commit

Permalink
Merge #83404 #83541
Browse files Browse the repository at this point in the history
83404: batcheval: support MVCC range tombstones in `AddSSTable` r=aliher1911 a=erikgrinaker

**storage: expose `IterOptions` for SST iterators**

This patch allows callers to pass `IterOptions` for the new SST
iterators.

Release note: None

**batcheval: support MVCC range tombstones in `AddSSTable`**

This patch adds initial support for MVCC range tombstones in
`AddSSTable`, allowing ingestion of SSTs with such tombstones. Callers
must check the `MVCCRangeTombstones` version gate before writing them.

They are not yet supported with `DisallowConflicts`,
`DisallowShadowing`, and `DisallowShadowingBelow` (including checking
for conflicts with existing range tombstones), and will error if an SST
contains any MVCC range tombstone. This is not needed for the initial
cluster replication use-case, and proper support would require MVCC
stats support for range tombstones as well -- this will be implemented
later.

Resolves #76234.

Release note: None

83541: sql: fix panic in plan gist decoding r=mgartner a=mgartner

#### sql: fix panic in plan gist decoding

Previously, the plan gist decoder would set tables and indexes to nil in
`exec.Node`s when they could not be decoded, which can happen when
tables or indexes are dropped. This could cause node-crashing panics
because `explain.Emit` assumes that `exec.Node`s never have nil tables
or indexes.

This commit fixes the issue assigning two new structs to `exec.Node`
fields instead of `nil`: `unknownTable` which implements `cat.Table` and
`unknownIndex` which implements `cat.Index`. This avoids nil pointer
exceptions when `explain.Emit` tries to access these fields.

Fixes #83537

Release note (bug fix): A bug has been fixed which could crash nodes
when visiting the console statements page. This bug was present since
version 21.2.0.

#### sql: move panic catcher in plan gist decoder

This commit moves the panic catcher in `DecodePlanGistToPlan` to its
calling function `DecodePlanGistToRows`. This allows panics originating
in `Emit` to be caught without crashing nodes.

Release note: None


Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
3 people committed Jul 1, 2022
3 parents 5d279a0 + 4591c7f + a793500 commit d324df1
Show file tree
Hide file tree
Showing 16 changed files with 682 additions and 99 deletions.
8 changes: 5 additions & 3 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ import (
"github.com/stretchr/testify/require"
)

var pointKV = storageutils.PointKV
var (
pointKV = storageutils.PointKV
rangeKV = storageutils.RangeKV
)

type kvs = storageutils.KVs

Expand Down Expand Up @@ -521,8 +524,7 @@ func TestWithOnSSTable(t *testing.T) {
pointKV("a", ts, "1"),
pointKV("b", ts, "2"),
pointKV("c", ts, "3"),
pointKV("d", ts, "4"),
pointKV("e", ts, "5"),
rangeKV("d", "e", ts, ""),
}
sst, sstStart, sstEnd := storageutils.MakeSST(t, srv.ClusterSettings(), sstKVs)
_, _, _, pErr := db.AddSSTableAtBatchTimestamp(ctx, sstStart, sstEnd, sst,
Expand Down
174 changes: 143 additions & 31 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,18 @@ func EvalAddSSTable(
// Verify that the keys in the sstable are within the range specified by the
// request header, and if the request did not include pre-computed stats,
// compute the expected MVCC stats delta of ingesting the SST.
sstIter, err := storage.NewMemSSTIterator(sst, true)
sstIter, err := storage.NewPebbleMemSSTIterator(sst, true /* verify */, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
UpperBound: keys.MaxKey,
})
if err != nil {
return result.Result{}, err
}
defer sstIter.Close()

// Check that the first key is in the expected range.
sstIter.SeekGE(storage.MVCCKey{Key: keys.MinKey})
ok, err := sstIter.Valid()
if err != nil {
if ok, err := sstIter.Valid(); err != nil {
return result.Result{}, err
} else if ok {
if unsafeKey := sstIter.UnsafeKey(); unsafeKey.Less(start) {
Expand All @@ -249,8 +251,7 @@ func EvalAddSSTable(
}

sstIter.SeekGE(end)
ok, err = sstIter.Valid()
if err != nil {
if ok, err := sstIter.Valid(); err != nil {
return result.Result{}, err
} else if ok {
return result.Result{}, errors.Errorf("last key %s not in request range [%s,%s)",
Expand Down Expand Up @@ -340,55 +341,102 @@ func EvalAddSSTable(
if args.ReturnFollowingLikelyNonEmptySpanStart {
existingIter := spanset.DisableReaderAssertions(readWriter).NewMVCCIterator(
storage.MVCCKeyIterKind, // don't care if it is committed or not, just that it isn't empty.
storage.IterOptions{UpperBound: reply.RangeSpan.EndKey},
storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
UpperBound: reply.RangeSpan.EndKey,
},
)
defer existingIter.Close()
existingIter.SeekGE(end)
ok, err = existingIter.Valid()
if err != nil {
if ok, err := existingIter.Valid(); err != nil {
return result.Result{}, errors.Wrap(err, "error while searching for non-empty span start")
} else if ok {
reply.FollowingLikelyNonEmptySpanStart = existingIter.Key().Key
}
}

// Ingest the SST as regular writes if requested. This is *not* a general
// transformation of any arbitrary SST to a WriteBatch: it assumes every key
// in the SST is a simple Set or RangeKeySet. This is already assumed
// elsewhere in this RPC though, so that's OK here.
//
// The MVCC engine writer methods do not record logical operations, but we
// must use them because e.g. storage.MVCCPut() changes the semantics of the
// write by not allowing writing below existing keys, and we want to retain
// parity with regular SST ingestion which does allow this. We therefore
// have to record logical ops ourselves.
if args.IngestAsWrites {
span.RecordStructured(&types.StringValue{Value: fmt.Sprintf("ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(sst))})
log.VEventf(ctx, 2, "ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(sst))
sstIter.SeekGE(storage.MVCCKey{Key: keys.MinKey})
for {
ok, err := sstIter.Valid()
if err != nil {

// Ingest point keys.
pointIter, err := storage.NewPebbleMemSSTIterator(sst, true /* verify */, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
UpperBound: keys.MaxKey,
})
if err != nil {
return result.Result{}, err
}
defer pointIter.Close()

for pointIter.SeekGE(storage.NilKey); ; pointIter.Next() {
if ok, err := pointIter.Valid(); err != nil {
return result.Result{}, err
} else if !ok {
break
}
// NB: This is *not* a general transformation of any arbitrary SST to a
// WriteBatch: it assumes every key in the SST is a simple Set. This is
// already assumed elsewhere in this RPC though, so that's OK here.
k := sstIter.UnsafeKey()
if k.Timestamp.IsEmpty() {
if err := readWriter.PutUnversioned(k.Key, sstIter.UnsafeValue()); err != nil {
key := pointIter.UnsafeKey()
if key.Timestamp.IsEmpty() {
if err := readWriter.PutUnversioned(key.Key, pointIter.UnsafeValue()); err != nil {
return result.Result{}, err
}
} else {
if err := readWriter.PutRawMVCC(k, sstIter.UnsafeValue()); err != nil {
if err := readWriter.PutRawMVCC(key, pointIter.UnsafeValue()); err != nil {
return result.Result{}, err
}
}
// The above MVCC functions do not record logical operations, but we must
// use them because e.g. storage.MVCCPut() changes the semantics of the
// write by not allowing writing below existing keys, and we want to
// retain parity with regular SST ingestion which does allow this. We
// therefore record these operations ourselves.
if sstToReqTS.IsSet() {
readWriter.LogLogicalOp(storage.MVCCWriteValueOpType, storage.MVCCLogicalOpDetails{
Key: k.Key,
Timestamp: k.Timestamp,
Key: key.Key,
Timestamp: key.Timestamp,
})
}
sstIter.Next()
}

// Ingest range keys.
rangeIter, err := storage.NewPebbleMemSSTIterator(sst, true /* verify */, storage.IterOptions{
KeyTypes: storage.IterKeyTypeRangesOnly,
UpperBound: keys.MaxKey,
})
if err != nil {
return result.Result{}, err
}
defer rangeIter.Close()

for rangeIter.SeekGE(storage.NilKey); ; rangeIter.Next() {
if ok, err := rangeIter.Valid(); err != nil {
return result.Result{}, err
} else if !ok {
break
}
for _, rkv := range rangeIter.RangeKeys() {
mvccValue, err := storage.DecodeMVCCValue(rkv.Value)
if err != nil {
return result.Result{}, err
}
if err = readWriter.ExperimentalPutMVCCRangeKey(rkv.RangeKey, mvccValue); err != nil {
return result.Result{}, err
}
if sstToReqTS.IsSet() {
readWriter.LogLogicalOp(storage.MVCCDeleteRangeOpType, storage.MVCCLogicalOpDetails{
Key: rkv.RangeKey.StartKey,
EndKey: rkv.RangeKey.EndKey,
Timestamp: rkv.RangeKey.Timestamp,
})
}
}
}

return result.Result{
Replicated: kvserverpb.ReplicatedEvalResult{
MVCCHistoryMutation: mvccHistoryMutation,
Expand Down Expand Up @@ -417,18 +465,23 @@ func EvalAddSSTable(
// assertSSTContents checks that the SST contains expected inputs:
//
// * Only SST set operations (not explicitly verified).
// * No intents, tombstones, or unversioned values.
// * No intents or unversioned values.
// * If sstTimestamp is set, all MVCC timestamps equal it.
// * MVCCValueHeader is empty.
// * Given MVCC stats match the SST contents.
func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.MVCCStats) error {
iter, err := storage.NewMemSSTIterator(sst, true)

// Check SST point keys.
iter, err := storage.NewPebbleMemSSTIterator(sst, true /* verify */, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
UpperBound: keys.MaxKey,
})
if err != nil {
return err
}
defer iter.Close()

// Check SST KV pairs.
for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() {
for iter.SeekGE(storage.NilKey); ; iter.Next() {
if ok, err := iter.Valid(); err != nil {
return err
} else if !ok {
Expand All @@ -443,12 +496,71 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M
return errors.AssertionFailedf("SST has unexpected timestamp %s (expected %s) for key %s",
key.Timestamp, sstTimestamp, key.Key)
}
value, err := storage.DecodeMVCCValue(iter.UnsafeValue())
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
"SST contains invalid value for key %s", key)
}
if value.MVCCValueHeader != (enginepb.MVCCValueHeader{}) {
return errors.AssertionFailedf("SST contains non-empty MVCC value header for key %s", key)
}
}

// Check SST range keys.
iter, err = storage.NewPebbleMemSSTIterator(sst, true /* verify */, storage.IterOptions{
KeyTypes: storage.IterKeyTypeRangesOnly,
UpperBound: keys.MaxKey,
})
if err != nil {
return err
}
defer iter.Close()

for iter.SeekGE(storage.NilKey); ; iter.Next() {
if ok, err := iter.Valid(); err != nil {
return err
} else if !ok {
break
}

for _, rkv := range iter.RangeKeys() {
if err := rkv.RangeKey.Validate(); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "SST contains invalid range key")
}
if sstTimestamp.IsSet() && rkv.RangeKey.Timestamp != sstTimestamp {
return errors.AssertionFailedf(
"SST has unexpected timestamp %s (expected %s) for range key %s",
rkv.RangeKey.Timestamp, sstTimestamp, rkv.RangeKey)
}
value, err := storage.DecodeMVCCValue(rkv.Value)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
"SST contains invalid range key value for range key %s", rkv.RangeKey)
}
if !value.IsTombstone() {
return errors.AssertionFailedf("SST contains non-tombstone range key %s", rkv.RangeKey)
}
if value.MVCCValueHeader != (enginepb.MVCCValueHeader{}) {
return errors.AssertionFailedf("SST contains non-empty MVCC value header for range key %s",
rkv.RangeKey)
}
}
}

// Compare statistics with those passed by client. We calculate them at the
// same timestamp as the given statistics, since they may contain
// timing-dependent values (typically MVCC garbage, i.e. multiple versions).
if stats != nil {
iter, err = storage.NewPebbleMemSSTIterator(sst, true /* verify */, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: keys.MinKey,
UpperBound: keys.MaxKey,
})
if err != nil {
return err
}
defer iter.Close()

given := *stats
actual, err := storage.ComputeStatsForRange(
iter, keys.MinKey, keys.MaxKey, given.LastUpdateNanos)
Expand Down
Loading

0 comments on commit d324df1

Please sign in to comment.