From 9ecc61993311296d370786e975eda63cc44fdf93 Mon Sep 17 00:00:00 2001 From: Nick Travers Date: Fri, 18 Feb 2022 07:47:47 -0800 Subject: [PATCH] clusterversion,storage: support Pebblev2 table format The `Pebblev2` SSTable format adds support for range keys. Add two new cluster versions to provide the upgrade path - the first version for bumping the store, the second for use as a feature gate. Rework the table format inference for new SSTable writers to take a more conservative approach. By default, assume version `RocksDBv2`, and only bump up to a newer version if a cluster supports it. Previously a newer version was assumed and bumped down if the cluster didn't support it. Update `storage.MakeBackupSSTWriter` to take in a context and cluster settings which can be used to infer which table version to use. Enable range keys for backups and ingest, if the cluster supports it. Release note: None --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/ccl/backupccl/backup_processor.go | 2 +- .../backupccl/restore_data_processor_test.go | 11 +++++-- pkg/clusterversion/cockroach_versions.go | 19 +++++++++++ pkg/clusterversion/key_string.go | 6 ++-- .../batcheval/cmd_add_sstable_test.go | 2 +- pkg/storage/bench_pebble_test.go | 4 ++- pkg/storage/pebble.go | 15 +++++---- pkg/storage/sst_test.go | 5 +-- pkg/storage/sst_writer.go | 33 ++++++++++++------- pkg/storage/sst_writer_test.go | 2 +- 12 files changed, 73 insertions(+), 30 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 44ccab61d25b..471fd2159831 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -181,4 +181,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-70 set the active cluster version in the format '.' +version version 21.2-74 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index dc97d194c842..be0133cc1187 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -194,6 +194,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-70set the active cluster version in the format '.' +versionversion21.2-74set the active cluster version in the format '.' diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index f033d94583e0..71cc4c8401f1 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -785,7 +785,7 @@ func (s *sstSink) open(ctx context.Context) error { } } s.out = w - s.sst = storage.MakeBackupSSTWriter(s.out) + s.sst = storage.MakeBackupSSTWriter(ctx, s.dest.Settings(), s.out) return nil } diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index dde017820eb3..a2252ff614ea 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -201,11 +201,13 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { keySlice = append(keySlice, key) } + ctx := context.Background() + cs := cluster.MakeTestingClusterSettings() writeSST := func(t *testing.T, offsets []int) string { path := strconv.FormatInt(hlc.UnixNano(), 10) sstFile := &storage.MemFile{} - sst := storage.MakeBackupSSTWriter(sstFile) + sst := storage.MakeBackupSSTWriter(ctx, cs, sstFile) defer sst.Close() ts := hlc.NewClock(hlc.UnixNano, time.Nanosecond).Now() value := roachpb.MakeValueFromString("bar") @@ -248,8 +250,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { }, }} - ctx := context.Background() - args := base.TestServerArgs{Knobs: knobs, ExternalIODir: dir} + args := base.TestServerArgs{ + Knobs: knobs, + ExternalIODir: dir, + Settings: cs, + } // TODO(dan): This currently doesn't work with AddSSTable on in-memory // stores because RocksDB's InMemoryEnv doesn't support NewRandomRWFile // (which breaks the global-seqno rewrite used when the added sstable diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 77c95b0454f9..9a31aefa23a5 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -291,6 +291,16 @@ const ( // EnableLeaseHolderRemoval enables removing a leaseholder and transferring the lease // during joint configuration, including to VOTER_INCOMING replicas. EnableLeaseHolderRemoval + // EnsurePebbleFormatVersionRangeKeys is the first step of a two-part + // migration that bumps Pebble's format major version to a version that + // supports range keys. + EnsurePebbleFormatVersionRangeKeys + // EnablePebbleFormatVersionRangeKeys is the second of a two-part migration + // and is used as the feature gate for use of range keys. Any node at this + // version is guaranteed to reside in a cluster where all nodes support range + // keys at the Pebble layer. + EnablePebbleFormatVersionRangeKeys + // ************************************************* // Step (1): Add new versions here. // Do not add new versions to a patch release. @@ -466,6 +476,15 @@ var versionsSingleton = keyedVersions{ Key: EnableLeaseHolderRemoval, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 70}, }, + { + Key: EnsurePebbleFormatVersionRangeKeys, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 72}, + }, + { + Key: EnablePebbleFormatVersionRangeKeys, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 74}, + }, + // ************************************************* // Step (2): Add new versions here. // Do not add new versions to a patch release. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 3a97efc98cc8..5d5fc454792b 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -44,11 +44,13 @@ func _() { _ = x[DisableSystemConfigGossipTrigger-33] _ = x[MVCCIndexBackfiller-34] _ = x[EnableLeaseHolderRemoval-35] + _ = x[EnsurePebbleFormatVersionRangeKeys-36] + _ = x[EnablePebbleFormatVersionRangeKeys-37] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfillerEnableLeaseHolderRemoval" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfillerEnableLeaseHolderRemovalEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeys" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 742, 778, 820, 839, 879, 911, 930, 954} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 742, 778, 820, 839, 879, 911, 930, 954, 988, 1022} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index dc5ac13cdc79..3d10d07254c7 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -1033,7 +1033,7 @@ func runTestDBAddSSTable( value.InitChecksum([]byte("foo")) sstFile := &storage.MemFile{} - w := storage.MakeBackupSSTWriter(sstFile) + w := storage.MakeBackupSSTWriter(ctx, cs, sstFile) defer w.Close() require.NoError(t, w.Put(key, value.RawBytes)) require.NoError(t, w.Finish()) diff --git a/pkg/storage/bench_pebble_test.go b/pkg/storage/bench_pebble_test.go index cdb94dd8247d..7a540849a7c2 100644 --- a/pkg/storage/bench_pebble_test.go +++ b/pkg/storage/bench_pebble_test.go @@ -17,6 +17,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -32,7 +33,8 @@ func setupMVCCPebble(b testing.TB, dir string) Engine { peb, err := Open( context.Background(), Filesystem(dir), - CacheSize(testCacheSize)) + CacheSize(testCacheSize), + Settings(cluster.MakeTestingClusterSettings())) if err != nil { b.Fatalf("could not create new pebble instance at %s: %+v", dir, err) } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index fde8f277f707..ab2280a2007d 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -934,7 +934,7 @@ func (p *Pebble) ExportMVCCToSst( ) (roachpb.BulkOpSummary, roachpb.Key, hlc.Timestamp, error) { r := wrapReader(p) // Doing defer r.Free() does not inline. - summary, k, err := pebbleExportToSst(ctx, r, exportOptions, dest) + summary, k, err := pebbleExportToSst(ctx, p.settings, r, exportOptions, dest) r.Free() return summary, k.Key, k.Timestamp, err } @@ -1602,6 +1602,10 @@ func (p *Pebble) SetMinVersion(version roachpb.Version) error { formatVers := pebble.FormatMostCompatible // Cases are ordered from newer to older versions. switch { + case !version.Less(clusterversion.ByKey(clusterversion.EnsurePebbleFormatVersionRangeKeys)): + if formatVers < pebble.FormatRangeKeys { + formatVers = pebble.FormatRangeKeys + } case !version.Less(clusterversion.ByKey(clusterversion.PebbleFormatBlockPropertyCollector)): if formatVers < pebble.FormatBlockPropertyCollector { formatVers = pebble.FormatBlockPropertyCollector @@ -1707,7 +1711,7 @@ func (p *pebbleReadOnly) ExportMVCCToSst( ) (roachpb.BulkOpSummary, roachpb.Key, hlc.Timestamp, error) { r := wrapReader(p) // Doing defer r.Free() does not inline. - summary, k, err := pebbleExportToSst(ctx, r, exportOptions, dest) + summary, k, err := pebbleExportToSst(ctx, p.parent.settings, r, exportOptions, dest) r.Free() return summary, k.Key, k.Timestamp, err } @@ -1964,7 +1968,7 @@ func (p *pebbleSnapshot) ExportMVCCToSst( ) (roachpb.BulkOpSummary, roachpb.Key, hlc.Timestamp, error) { r := wrapReader(p) // Doing defer r.Free() does not inline. - summary, k, err := pebbleExportToSst(ctx, r, exportOptions, dest) + summary, k, err := pebbleExportToSst(ctx, p.settings, r, exportOptions, dest) r.Free() return summary, k.Key, k.Timestamp, err } @@ -2085,13 +2089,12 @@ func (e *ExceedMaxSizeError) Error() string { } func pebbleExportToSst( - ctx context.Context, reader Reader, options ExportOptions, dest io.Writer, + ctx context.Context, cs *cluster.Settings, reader Reader, options ExportOptions, dest io.Writer, ) (roachpb.BulkOpSummary, MVCCKey, error) { var span *tracing.Span ctx, span = tracing.ChildSpan(ctx, "pebbleExportToSst") - _ = ctx // ctx is currently unused, but this new ctx should be used below in the future. defer span.Finish() - sstWriter := MakeBackupSSTWriter(dest) + sstWriter := MakeBackupSSTWriter(ctx, cs, dest) defer sstWriter.Close() var rows RowCounter diff --git a/pkg/storage/sst_test.go b/pkg/storage/sst_test.go index ece6fd874edf..6b37349f135d 100644 --- a/pkg/storage/sst_test.go +++ b/pkg/storage/sst_test.go @@ -48,8 +48,9 @@ func TestCheckSSTConflictsMaxIntents(t *testing.T) { } // Create SST with keys equal to intents at txn2TS. + cs := cluster.MakeTestingClusterSettings() sstFile := &MemFile{} - sstWriter := MakeBackupSSTWriter(sstFile) + sstWriter := MakeBackupSSTWriter(context.Background(), cs, sstFile) defer sstWriter.Close() for _, k := range intents { key := MVCCKey{Key: roachpb.Key(k), Timestamp: txn2TS} @@ -64,7 +65,7 @@ func TestCheckSSTConflictsMaxIntents(t *testing.T) { for _, engineImpl := range mvccEngineImpls { t.Run(engineImpl.name, func(t *testing.T) { ctx := context.Background() - engine := engineImpl.create() + engine := engineImpl.create(Settings(cs)) defer engine.Close() // Write some committed keys and intents at txn1TS. diff --git a/pkg/storage/sst_writer.go b/pkg/storage/sst_writer.go index e36c86d8fea1..779767b26c1a 100644 --- a/pkg/storage/sst_writer.go +++ b/pkg/storage/sst_writer.go @@ -56,17 +56,22 @@ func (noopSyncCloser) Close() error { // MakeIngestionWriterOptions returns writer options suitable for writing SSTs // that will subsequently be ingested (e.g. with AddSSTable). -func MakeIngestionWriterOptions(ctx context.Context, st *cluster.Settings) sstable.WriterOptions { - opts := DefaultPebbleOptions().MakeWriterOptions(0, sstable.TableFormatPebblev1) - // Only enable block properties if this cluster version support it. - // NB: we check for the _second_ of the two cluster versions. The first is - // used as a barrier for the major format version bump in the store. Nodes - // that are at the second version are guaranteed by the cluster migration - // framework to have already bumped their store major format versions to a - // sufficient version by the first. - if !st.Version.IsActive(ctx, clusterversion.EnablePebbleFormatVersionBlockProperties) { +func MakeIngestionWriterOptions(ctx context.Context, cs *cluster.Settings) sstable.WriterOptions { + // By default, take a conservative approach and assume we don't have newer + // table features available. Upgrade to an appropriate version only if the + // cluster supports it. + format := sstable.TableFormatRocksDBv2 + // Cases are ordered from newer to older versions. + switch { + case cs.Version.IsActive(ctx, clusterversion.EnablePebbleFormatVersionRangeKeys): + format = sstable.TableFormatPebblev2 // Range keys. + case cs.Version.IsActive(ctx, clusterversion.EnablePebbleFormatVersionBlockProperties): + format = sstable.TableFormatPebblev1 // Block properties. + } + opts := DefaultPebbleOptions().MakeWriterOptions(0, format) + if format < sstable.TableFormatPebblev1 { + // Block properties aren't available at this version. Disable collection. opts.BlockPropertyCollectors = nil - opts.TableFormat = sstable.TableFormatRocksDBv2 } opts.MergerName = "nullptr" return opts @@ -74,8 +79,14 @@ func MakeIngestionWriterOptions(ctx context.Context, st *cluster.Settings) sstab // MakeBackupSSTWriter creates a new SSTWriter tailored for backup SSTs which // are typically only ever iterated in their entirety. -func MakeBackupSSTWriter(f io.Writer) SSTWriter { +func MakeBackupSSTWriter(ctx context.Context, cs *cluster.Settings, f io.Writer) SSTWriter { + // By default, take a conservative approach and assume we don't have newer + // table features available. Upgrade to an appropriate version only if the + // cluster supports it. opts := DefaultPebbleOptions().MakeWriterOptions(0, sstable.TableFormatRocksDBv2) + if cs.Version.IsActive(ctx, clusterversion.EnablePebbleFormatVersionRangeKeys) { + opts.TableFormat = sstable.TableFormatPebblev2 // Range keys. + } // Don't need BlockPropertyCollectors for backups. opts.BlockPropertyCollectors = nil diff --git a/pkg/storage/sst_writer_test.go b/pkg/storage/sst_writer_test.go index 812591eb7f93..9347cb9265e8 100644 --- a/pkg/storage/sst_writer_test.go +++ b/pkg/storage/sst_writer_test.go @@ -61,7 +61,7 @@ func makePebbleSST(t testing.TB, kvs []storage.MVCCKeyValue, ingestion bool) []b if ingestion { w = storage.MakeIngestionSSTWriter(ctx, st, f) } else { - w = storage.MakeBackupSSTWriter(f) + w = storage.MakeBackupSSTWriter(ctx, st, f) } defer w.Close()