Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster version as feature gate for block properties #76278

Merged
merged 2 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-62 set the active cluster version in the format '<major>.<minor>'
version version 21.2-64 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-62</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-64</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
14 changes: 14 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,16 @@ const (
DontProposeWriteTimestampForLeaseTransfers
// TenantSettingsTable adds the system table for tracking tenant usage.
TenantSettingsTable
// EnablePebbleFormatVersionBlockProperties enables a new Pebble SSTable
// format version for block property collectors.
// NB: this cluster version is paired with PebbleFormatBlockPropertyCollector
// in a two-phase migration. The first cluster version acts as a gate for
// updating the format major version on all stores, while the second cluster
// version is used as a feature gate. A node in a cluster that sees the second
// version is guaranteed to have seen the first version, and therefore has an
// engine running at the required format major version, as do all other nodes
// in the cluster.
EnablePebbleFormatVersionBlockProperties

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -429,6 +439,10 @@ var versionsSingleton = keyedVersions{
Key: TenantSettingsTable,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 62},
},
{
Key: EnablePebbleFormatVersionBlockProperties,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 64},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (b *SSTBatcher) Reset(ctx context.Context) error {
// Create "Ingestion" SSTs in the newer RocksDBv2 format only if all nodes
// in the cluster can support it. Until then, for backward compatibility,
// create SSTs in the leveldb format ("backup" ones).
b.sstWriter = storage.MakeIngestionSSTWriter(b.sstFile)
b.sstWriter = storage.MakeIngestionSSTWriter(ctx, b.settings, b.sstFile)
b.batchStartKey = b.batchStartKey[:0]
b.batchEndKey = b.batchEndKey[:0]
b.batchEndValue = b.batchEndValue[:0]
Expand Down Expand Up @@ -611,7 +611,7 @@ func createSplitSSTable(
settings *cluster.Settings,
) (*sstSpan, *sstSpan, error) {
sstFile := &storage.MemFile{}
w := storage.MakeIngestionSSTWriter(sstFile)
w := storage.MakeIngestionSSTWriter(ctx, settings, sstFile)
defer w.Close()

split := false
Expand Down Expand Up @@ -641,7 +641,7 @@ func createSplitSSTable(
disallowShadowingBelow: disallowShadowingBelow,
}
*sstFile = storage.MemFile{}
w = storage.MakeIngestionSSTWriter(sstFile)
w = storage.MakeIngestionSSTWriter(ctx, settings, sstFile)
split = true
first = nil
last = nil
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,12 @@ func EvalAddSSTable(
if args.WriteAtRequestTimestamp &&
(args.SSTTimestamp.IsEmpty() || h.Timestamp != args.SSTTimestamp ||
util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false)) {
st := cArgs.EvalCtx.ClusterSettings()
// TODO(dt): use a quotapool.
concurrency := int(addSSTableRewriteConcurrency.Get(&cArgs.EvalCtx.ClusterSettings().SV))
sst, err = storage.UpdateSSTTimestamps(sst, args.SSTTimestamp, h.Timestamp, concurrency)
concurrency := int(addSSTableRewriteConcurrency.Get(&st.SV))
sst, err = storage.UpdateSSTTimestamps(
ctx, st, sst, args.SSTTimestamp, h.Timestamp, concurrency,
)
if err != nil {
return result.Result{}, errors.Wrap(err, "updating SST timestamps")
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,17 @@ func TestExportGCThreshold(t *testing.T) {
// exportUsingGoIterator uses the legacy implementation of export, and is used
// as an oracle to check the correctness of pebbleExportToSst.
func exportUsingGoIterator(
ctx context.Context,
filter roachpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
startKey, endKey roachpb.Key,
enableTimeBoundIteratorOptimization timeBoundOptimisation,
reader storage.Reader,
) ([]byte, error) {
memFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(memFile)
sst := storage.MakeIngestionSSTWriter(
ctx, cluster.MakeTestingClusterSettings(), memFile,
)
defer sst.Close()

var skipTombstones bool
Expand Down Expand Up @@ -569,7 +572,7 @@ func assertEqualKVs(

// Run the oracle which is a legacy implementation of pebbleExportToSst
// backed by an MVCCIncrementalIterator.
expected, err := exportUsingGoIterator(filter, startTime, endTime,
expected, err := exportUsingGoIterator(ctx, filter, startTime, endTime,
startKey, endKey, enableTimeBoundIteratorOptimization, e)
if err != nil {
t.Fatalf("Oracle failed to export provided key range.")
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3789,9 +3789,11 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
defer it.Close()
// Write a range deletion tombstone to each of the SSTs then put in the
// kv entries from the sender of the snapshot.
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
for _, r := range keyRanges {
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(sstFile)
sst := storage.MakeIngestionSSTWriter(ctx, st, sstFile)
if err := sst.ClearRawRange(r.Start, r.End); err != nil {
return err
}
Expand Down Expand Up @@ -3828,7 +3830,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
for _, k := range []roachpb.Key{keyB, keyC} {
rangeID := rangeIds[string(k)]
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(sstFile)
sst := storage.MakeIngestionSSTWriter(ctx, st, sstFile)
defer sst.Close()
r := rditer.MakeRangeIDLocalKeyRange(rangeID, false /* replicatedOnly */)
if err := sst.ClearRawRange(r.Start, r.End); err != nil {
Expand All @@ -3848,7 +3850,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {

// Construct an SST for the user key range of the subsumed replicas.
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(sstFile)
sst := storage.MakeIngestionSSTWriter(ctx, st, sstFile)
defer sst.Close()
desc := roachpb.RangeDescriptor{
StartKey: roachpb.RKey(keyD),
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -397,9 +398,11 @@ func (r *Replica) LoadBasedSplitter() *split.Decider {
return &r.loadBasedSplitter
}

func MakeSSTable(key, value string, ts hlc.Timestamp) ([]byte, storage.MVCCKeyValue) {
func MakeSSTable(
ctx context.Context, key, value string, ts hlc.Timestamp,
) ([]byte, storage.MVCCKeyValue) {
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(sstFile)
sst := storage.MakeIngestionSSTWriter(ctx, cluster.MakeTestingClusterSettings(), sstFile)
defer sst.Close()

v := roachpb.MakeValueFromBytes([]byte(value))
Expand Down Expand Up @@ -427,7 +430,7 @@ func ProposeAddSSTable(ctx context.Context, key, val string, ts hlc.Timestamp, s
ba.RangeID = store.LookupReplica(roachpb.RKey(key)).RangeID

var addReq roachpb.AddSSTableRequest
addReq.Data, _ = MakeSSTable(key, val, ts)
addReq.Data, _ = MakeSSTable(ctx, key, val, ts)
addReq.Key = roachpb.Key(key)
addReq.EndKey = addReq.Key.Next()
ba.Add(&addReq)
Expand Down
12 changes: 9 additions & 3 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,9 @@ func (r *Replica) applySnapshot(
}(timeutil.Now())

unreplicatedSSTFile := &storage.MemFile{}
unreplicatedSST := storage.MakeIngestionSSTWriter(unreplicatedSSTFile)
unreplicatedSST := storage.MakeIngestionSSTWriter(
ctx, r.ClusterSettings(), unreplicatedSSTFile,
)
defer unreplicatedSST.Close()

// Clearing the unreplicated state.
Expand Down Expand Up @@ -1111,7 +1113,9 @@ func (r *Replica) clearSubsumedReplicaDiskData(

// We have to create an SST for the subsumed replica's range-id local keys.
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
subsumedReplSST := storage.MakeIngestionSSTWriter(
ctx, r.ClusterSettings(), subsumedReplSSTFile,
)
defer subsumedReplSST.Close()
// NOTE: We set mustClearRange to true because we are setting
// RangeTombstoneKey. Since Clears and Puts need to be done in increasing
Expand Down Expand Up @@ -1167,7 +1171,9 @@ func (r *Replica) clearSubsumedReplicaDiskData(
for i := range keyRanges {
if totalKeyRanges[i].End.Compare(keyRanges[i].End) > 0 {
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
subsumedReplSST := storage.MakeIngestionSSTWriter(
ctx, r.ClusterSettings(), subsumedReplSSTFile,
)
defer subsumedReplSST.Close()
if err := storage.ClearRangeWithHeuristic(
r.store.Engine(),
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,9 @@ func TestReplicaRangefeed(t *testing.T) {
expVal6q.SetInt(7)
expVal6q.InitChecksum(roachpb.Key("q"))

st := cluster.MakeTestingClusterSettings()
sstFile := &storage.MemFile{}
sstWriter := storage.MakeIngestionSSTWriter(sstFile)
sstWriter := storage.MakeIngestionSSTWriter(ctx, st, sstFile)
defer sstWriter.Close()
require.NoError(t, sstWriter.PutMVCC(
storage.MVCCKey{Key: roachpb.Key("b"), Timestamp: ts6},
Expand Down Expand Up @@ -366,7 +367,7 @@ func TestReplicaRangefeed(t *testing.T) {
expVal7q.InitChecksum(roachpb.Key("q"))

sstFile = &storage.MemFile{}
sstWriter = storage.MakeIngestionSSTWriter(sstFile)
sstWriter = storage.MakeIngestionSSTWriter(ctx, st, sstFile)
defer sstWriter.Close()
require.NoError(t, sstWriter.PutMVCC(
storage.MVCCKey{Key: roachpb.Key("b"), Timestamp: ts7},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func TestRaftSSTableSideloading(t *testing.T) {

// Put a sideloaded proposal on the Range.
key, val := "don't", "care"
origSSTData, _ := MakeSSTable(key, val, hlc.Timestamp{}.Add(0, 1))
origSSTData, _ := MakeSSTable(ctx, key, val, hlc.Timestamp{}.Add(0, 1))
{

var addReq roachpb.AddSSTableRequest
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/replica_sst_snapshot_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -165,7 +166,9 @@ func TestMultiSSTWriterInitSST(t *testing.T) {
}
keyRanges := rditer.MakeReplicatedKeyRanges(&desc)

msstw, err := newMultiSSTWriter(ctx, scratch, keyRanges, 0)
msstw, err := newMultiSSTWriter(
ctx, cluster.MakeTestingClusterSettings(), scratch, keyRanges, 0,
)
require.NoError(t, err)
_, err = msstw.Finish(ctx)
require.NoError(t, err)
Expand All @@ -184,7 +187,7 @@ func TestMultiSSTWriterInitSST(t *testing.T) {
for _, r := range keyRanges {
func() {
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(sstFile)
sst := storage.MakeIngestionSSTWriter(ctx, cluster.MakeTestingClusterSettings(), sstFile)
defer sst.Close()
err := sst.ClearRawRange(r.Start, r.End)
require.NoError(t, err)
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ type kvBatchSnapshotStrategy struct {
sstChunkSize int64
// Only used on the receiver side.
scratch *SSTSnapshotStorageScratch
st *cluster.Settings
}

// multiSSTWriter is a wrapper around RocksDBSstFileWriter and
// SSTSnapshotStorageScratch that handles chunking SSTs and persisting them to
// disk.
type multiSSTWriter struct {
st *cluster.Settings
scratch *SSTSnapshotStorageScratch
currSST storage.SSTWriter
keyRanges []rditer.KeyRange
Expand All @@ -143,11 +145,13 @@ type multiSSTWriter struct {

func newMultiSSTWriter(
ctx context.Context,
st *cluster.Settings,
scratch *SSTSnapshotStorageScratch,
keyRanges []rditer.KeyRange,
sstChunkSize int64,
) (multiSSTWriter, error) {
msstw := multiSSTWriter{
st: st,
scratch: scratch,
keyRanges: keyRanges,
sstChunkSize: sstChunkSize,
Expand All @@ -163,7 +167,7 @@ func (msstw *multiSSTWriter) initSST(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "failed to create new sst file")
}
newSST := storage.MakeIngestionSSTWriter(newSSTFile)
newSST := storage.MakeIngestionSSTWriter(ctx, msstw.st, newSSTFile)
msstw.currSST = newSST
if err := msstw.currSST.ClearRawRange(
msstw.keyRanges[msstw.currRange].Start, msstw.keyRanges[msstw.currRange].End); err != nil {
Expand Down Expand Up @@ -242,7 +246,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
// At the moment we'll write at most five SSTs.
// TODO(jeffreyxiao): Re-evaluate as the default range size grows.
keyRanges := rditer.MakeReplicatedKeyRanges(header.State.Desc)
msstw, err := newMultiSSTWriter(ctx, kvSS.scratch, keyRanges, kvSS.sstChunkSize)
msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, keyRanges, kvSS.sstChunkSize)
if err != nil {
return noSnap, err
}
Expand Down Expand Up @@ -664,6 +668,7 @@ func (s *Store) receiveSnapshot(
ss = &kvBatchSnapshotStrategy{
scratch: s.sstSnapshotStorage.NewScratchSpace(header.State.Desc.RangeID, snapUUID),
sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV),
st: s.ClusterSettings(),
}
defer ss.Close(ctx)
default:
Expand Down Expand Up @@ -1121,6 +1126,7 @@ func sendSnapshot(
batchSize: batchSize,
limiter: limiter,
newBatch: newBatch,
st: st,
}
default:
log.Fatalf(ctx, "unknown snapshot strategy: %s", header.Strategy)
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ go_test(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//sstable",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_kr_pretty//:pretty",
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1583,8 +1583,10 @@ func runCheckSSTConflicts(b *testing.B, numEngineKeys, numVersions, numSstKeys i
// The engine contains keys numbered key-1, key-2, key-3, etc, while
// the SST contains keys numbered key-11, key-21, etc., that fit in
// between the engine keys without colliding.
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
sstFile := &MemFile{}
sstWriter := MakeIngestionSSTWriter(sstFile)
sstWriter := MakeIngestionSSTWriter(ctx, st, sstFile)
var sstStart, sstEnd MVCCKey
for i := 0; i < numSstKeys; i++ {
keyNum := int((float64(i) / float64(numSstKeys)) * float64(numEngineKeys))
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/metamorphic/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_test(
],
embed = [":metamorphic"],
deps = [
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/util/leaktest",
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/metamorphic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type metaTestRunner struct {
nameToGenerator map[string]*opGenerator
ops []opRun
weights []int
st *cluster.Settings
}

func (m *metaTestRunner) init() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/metamorphic/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path/filepath"
"testing"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -78,6 +79,10 @@ func runMetaTestForEngines(run testRunForEngines) {
restarts: run.restarts,
engineSeq: run.engineSequence,
path: filepath.Join(tempDir, "store"),
// TODO(travers): Add metamorphic test support for different versions, which
// will give us better coverage across multiple format major versions and
// table versions.
st: cluster.MakeTestingClusterSettings(),
}
fmt.Printf("store path = %s\n", testRunner.path)

Expand Down
Loading