Skip to content

Commit

Permalink
storage, ccl: Gate AddSSTable()ing RocksDBv2 format SSTs on cluster v…
Browse files Browse the repository at this point in the history
…ersion

Change #42763 caused all SSTs written for ingestion to
be written in the RocksDBv2 format as opposed to the leveldb format.
This turned out to be an issue in mixed-version clusters where not
all nodes can iterate over and ingest RocksDBv2 SSTs; nodes without
commit 2beab58  (so 19.2.* and below) cannot iterate over
these SSTs.

This change reverts back to creating LevelDB SSTs for ingestion
in the SSTBatcher only, unless the minimum cluster version is a 20.1
commit. Other cases where we make RocksDBv2 SSTs (eg. in replica_raftstorage)
are okay and do not require this check, since those SSTs are ingested
by the same node where they're written.

Fixes #42081 .

Release note: None.
  • Loading branch information
itsbilal authored and danhhz committed Dec 13, 2019
1 parent 62640b2 commit e9e2a80
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo
iters = append(iters, iter)
}

batcher, err := bulk.MakeSSTBatcher(ctx, db, func() int64 { return MaxImportBatchSize(cArgs.EvalCtx.ClusterSettings()) })
batcher, err := bulk.MakeSSTBatcher(ctx, db, cArgs.EvalCtx.ClusterSettings(), func() int64 { return MaxImportBatchSize(cArgs.EvalCtx.ClusterSettings()) })
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (b *BufferingAdder) Flush(ctx context.Context) error {
}
return nil
}
if err := b.sink.Reset(); err != nil {
if err := b.sink.Reset(ctx); err != nil {
return err
}
b.flushCounts.total++
Expand Down
37 changes: 27 additions & 10 deletions pkg/storage/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ type SSTBatcher struct {

// MakeSSTBatcher makes a ready-to-use SSTBatcher.
func MakeSSTBatcher(
ctx context.Context, db SSTSender, flushBytes func() int64,
ctx context.Context, db SSTSender, settings *cluster.Settings, flushBytes func() int64,
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, maxSize: flushBytes, disallowShadowing: true}
err := b.Reset()
b := &SSTBatcher{db: db, settings: settings, maxSize: flushBytes, disallowShadowing: true}
err := b.Reset(ctx)
return b, err
}

Expand Down Expand Up @@ -174,10 +174,17 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key engine.MVCCKey, value [
}

// Reset clears all state in the batcher and prepares it for reuse.
func (b *SSTBatcher) Reset() error {
func (b *SSTBatcher) Reset(ctx context.Context) error {
b.sstWriter.Close()
b.sstFile = &engine.MemFile{}
b.sstWriter = engine.MakeIngestionSSTWriter(b.sstFile)
// Create "Ingestion" SSTs in the newer RocksDBv2 format only if all nodes
// in the cluster can support it. Until then, for backward compatibility,
// create SSTs in the leveldb format ("backup" ones).
if cluster.Version.IsActive(ctx, b.settings, cluster.VersionStart20_1) {
b.sstWriter = engine.MakeIngestionSSTWriter(b.sstFile)
} else {
b.sstWriter = engine.MakeBackupSSTWriter(b.sstFile)
}
b.batchStartKey = b.batchStartKey[:0]
b.batchEndKey = b.batchEndKey[:0]
b.batchEndValue = b.batchEndValue[:0]
Expand Down Expand Up @@ -221,14 +228,14 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
if err := b.doFlush(ctx, rangeFlush, nil); err != nil {
return err
}
return b.Reset()
return b.Reset(ctx)
}

if b.sstWriter.DataSize >= b.maxSize() {
if err := b.doFlush(ctx, sizeFlush, nextKey); err != nil {
return err
}
return b.Reset()
return b.Reset(ctx)
}
return nil
}
Expand Down Expand Up @@ -424,7 +431,7 @@ func AddSSTable(
if m, ok := errors.Cause(err).(*roachpb.RangeKeyMismatchError); ok {
split := m.MismatchedRange.EndKey.AsRawKey()
log.Infof(ctx, "SSTable cannot be added spanning range bounds %v, retrying...", split)
left, right, err := createSplitSSTable(ctx, db, item.start, split, item.disallowShadowing, iter)
left, right, err := createSplitSSTable(ctx, db, item.start, split, item.disallowShadowing, iter, settings)
if err != nil {
return err
}
Expand Down Expand Up @@ -469,9 +476,15 @@ func createSplitSSTable(
start, splitKey roachpb.Key,
disallowShadowing bool,
iter engine.SimpleIterator,
settings *cluster.Settings,
) (*sstSpan, *sstSpan, error) {
sstFile := &engine.MemFile{}
w := engine.MakeIngestionSSTWriter(sstFile)
var w engine.SSTWriter
if cluster.Version.IsActive(ctx, settings, cluster.VersionStart20_1) {
w = engine.MakeIngestionSSTWriter(sstFile)
} else {
w = engine.MakeBackupSSTWriter(sstFile)
}
defer w.Close()

split := false
Expand Down Expand Up @@ -501,7 +514,11 @@ func createSplitSSTable(
disallowShadowing: disallowShadowing,
}
*sstFile = engine.MemFile{}
w = engine.MakeIngestionSSTWriter(sstFile)
if cluster.Version.IsActive(ctx, settings, cluster.VersionStart20_1) {
w = engine.MakeIngestionSSTWriter(sstFile)
} else {
w = engine.MakeBackupSSTWriter(sstFile)
}
split = true
first = nil
last = nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/bulk"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -331,7 +332,7 @@ func TestAddBigSpanningSSTWithSplits(t *testing.T) {

t.Logf("Adding %dkb sst spanning %d splits from %v to %v", len(sst)/kb, len(splits), start, end)
if _, err := bulk.AddSSTable(
context.TODO(), mock, start, end, sst, false /* disallowShadowing */, enginepb.MVCCStats{}, nil,
context.TODO(), mock, start, end, sst, false /* disallowShadowing */, enginepb.MVCCStats{}, cluster.MakeTestingClusterSettings(),
); err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit e9e2a80

Please sign in to comment.