From e9e2a80361a25fd9f9b179f84be4c5c3d7e7d8cb Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Thu, 5 Dec 2019 15:18:24 -0500 Subject: [PATCH] storage, ccl: Gate AddSSTable()ing RocksDBv2 format SSTs on cluster version Change #42763 caused all SSTs written for ingestion to be written in the RocksDBv2 format as opposed to the leveldb format. This turned out to be an issue in mixed-version clusters where not all nodes can iterate over and ingest RocksDBv2 SSTs; nodes without commit 2beab580e3 (so 19.2.* and below) cannot iterate over these SSTs. This change reverts back to creating LevelDB SSTs for ingestion in the SSTBatcher only, unless the minimum cluster version is a 20.1 commit. Other cases where we make RocksDBv2 SSTs (eg. in replica_raftstorage) are okay and do not require this check, since those SSTs are ingested by the same node where they're written. Fixes #42081 . Release note: None. --- pkg/ccl/storageccl/import.go | 2 +- pkg/storage/bulk/buffering_adder.go | 2 +- pkg/storage/bulk/sst_batcher.go | 37 ++++++++++++++++++++-------- pkg/storage/bulk/sst_batcher_test.go | 3 ++- 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go index b4ca16ce731f..14ad788fda72 100644 --- a/pkg/ccl/storageccl/import.go +++ b/pkg/ccl/storageccl/import.go @@ -191,7 +191,7 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo iters = append(iters, iter) } - batcher, err := bulk.MakeSSTBatcher(ctx, db, func() int64 { return MaxImportBatchSize(cArgs.EvalCtx.ClusterSettings()) }) + batcher, err := bulk.MakeSSTBatcher(ctx, db, cArgs.EvalCtx.ClusterSettings(), func() int64 { return MaxImportBatchSize(cArgs.EvalCtx.ClusterSettings()) }) if err != nil { return nil, err } diff --git a/pkg/storage/bulk/buffering_adder.go b/pkg/storage/bulk/buffering_adder.go index 933fd48177d6..cd6fd3b1fd55 100644 --- a/pkg/storage/bulk/buffering_adder.go +++ b/pkg/storage/bulk/buffering_adder.go @@ -207,7 +207,7 @@ func (b *BufferingAdder) Flush(ctx context.Context) error { } return nil } - if err := b.sink.Reset(); err != nil { + if err := b.sink.Reset(ctx); err != nil { return err } b.flushCounts.total++ diff --git a/pkg/storage/bulk/sst_batcher.go b/pkg/storage/bulk/sst_batcher.go index e2a7c2176dcf..aad5964b554a 100644 --- a/pkg/storage/bulk/sst_batcher.go +++ b/pkg/storage/bulk/sst_batcher.go @@ -105,10 +105,10 @@ type SSTBatcher struct { // MakeSSTBatcher makes a ready-to-use SSTBatcher. func MakeSSTBatcher( - ctx context.Context, db SSTSender, flushBytes func() int64, + ctx context.Context, db SSTSender, settings *cluster.Settings, flushBytes func() int64, ) (*SSTBatcher, error) { - b := &SSTBatcher{db: db, maxSize: flushBytes, disallowShadowing: true} - err := b.Reset() + b := &SSTBatcher{db: db, settings: settings, maxSize: flushBytes, disallowShadowing: true} + err := b.Reset(ctx) return b, err } @@ -174,10 +174,17 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key engine.MVCCKey, value [ } // Reset clears all state in the batcher and prepares it for reuse. -func (b *SSTBatcher) Reset() error { +func (b *SSTBatcher) Reset(ctx context.Context) error { b.sstWriter.Close() b.sstFile = &engine.MemFile{} - b.sstWriter = engine.MakeIngestionSSTWriter(b.sstFile) + // Create "Ingestion" SSTs in the newer RocksDBv2 format only if all nodes + // in the cluster can support it. Until then, for backward compatibility, + // create SSTs in the leveldb format ("backup" ones). + if cluster.Version.IsActive(ctx, b.settings, cluster.VersionStart20_1) { + b.sstWriter = engine.MakeIngestionSSTWriter(b.sstFile) + } else { + b.sstWriter = engine.MakeBackupSSTWriter(b.sstFile) + } b.batchStartKey = b.batchStartKey[:0] b.batchEndKey = b.batchEndKey[:0] b.batchEndValue = b.batchEndValue[:0] @@ -221,14 +228,14 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err if err := b.doFlush(ctx, rangeFlush, nil); err != nil { return err } - return b.Reset() + return b.Reset(ctx) } if b.sstWriter.DataSize >= b.maxSize() { if err := b.doFlush(ctx, sizeFlush, nextKey); err != nil { return err } - return b.Reset() + return b.Reset(ctx) } return nil } @@ -424,7 +431,7 @@ func AddSSTable( if m, ok := errors.Cause(err).(*roachpb.RangeKeyMismatchError); ok { split := m.MismatchedRange.EndKey.AsRawKey() log.Infof(ctx, "SSTable cannot be added spanning range bounds %v, retrying...", split) - left, right, err := createSplitSSTable(ctx, db, item.start, split, item.disallowShadowing, iter) + left, right, err := createSplitSSTable(ctx, db, item.start, split, item.disallowShadowing, iter, settings) if err != nil { return err } @@ -469,9 +476,15 @@ func createSplitSSTable( start, splitKey roachpb.Key, disallowShadowing bool, iter engine.SimpleIterator, + settings *cluster.Settings, ) (*sstSpan, *sstSpan, error) { sstFile := &engine.MemFile{} - w := engine.MakeIngestionSSTWriter(sstFile) + var w engine.SSTWriter + if cluster.Version.IsActive(ctx, settings, cluster.VersionStart20_1) { + w = engine.MakeIngestionSSTWriter(sstFile) + } else { + w = engine.MakeBackupSSTWriter(sstFile) + } defer w.Close() split := false @@ -501,7 +514,11 @@ func createSplitSSTable( disallowShadowing: disallowShadowing, } *sstFile = engine.MemFile{} - w = engine.MakeIngestionSSTWriter(sstFile) + if cluster.Version.IsActive(ctx, settings, cluster.VersionStart20_1) { + w = engine.MakeIngestionSSTWriter(sstFile) + } else { + w = engine.MakeBackupSSTWriter(sstFile) + } split = true first = nil last = nil diff --git a/pkg/storage/bulk/sst_batcher_test.go b/pkg/storage/bulk/sst_batcher_test.go index a6e6b37933d4..69fc101b0ad3 100644 --- a/pkg/storage/bulk/sst_batcher_test.go +++ b/pkg/storage/bulk/sst_batcher_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/bulk" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" @@ -331,7 +332,7 @@ func TestAddBigSpanningSSTWithSplits(t *testing.T) { t.Logf("Adding %dkb sst spanning %d splits from %v to %v", len(sst)/kb, len(splits), start, end) if _, err := bulk.AddSSTable( - context.TODO(), mock, start, end, sst, false /* disallowShadowing */, enginepb.MVCCStats{}, nil, + context.TODO(), mock, start, end, sst, false /* disallowShadowing */, enginepb.MVCCStats{}, cluster.MakeTestingClusterSettings(), ); err != nil { t.Fatal(err) }