diff --git a/DEPS.bzl b/DEPS.bzl index 4127d5b9443a..e69eac899587 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1693,10 +1693,10 @@ def go_deps(): patches = [ "@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch", ], - sha256 = "f68528557224c2af2fd0b46199602f982fd44f813029040a1f3ceaf134633dc0", - strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20240308204553-8df4320c24e4", + sha256 = "bffe4ef26087a4e25f9deaad87ed1ea9849d3ea6032badce7cacb919d6614cc6", + strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20240312180812-51faab0a3555", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240308204553-8df4320c24e4.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240312180812-51faab0a3555.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 278af1edeb38..05e3e99fe114 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -330,7 +330,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.19.0.zip": "c4d516bcfe8c07b6fc09b8a9a07a95065b36c2855627cb3514e40c98f872b69e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20230118201751-21c54148d20b.zip": "ca7776f47e5fecb4c495490a679036bfc29d95bd7625290cfdb9abb0baf97476", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/metamorphic/com_github_cockroachdb_metamorphic-v0.0.0-20231108215700-4ba948b56895.zip": "28c8cf42192951b69378cf537be5a9a43f2aeb35542908cc4fe5f689505853ea", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240308204553-8df4320c24e4.zip": "f68528557224c2af2fd0b46199602f982fd44f813029040a1f3ceaf134633dc0", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240312180812-51faab0a3555.zip": "bffe4ef26087a4e25f9deaad87ed1ea9849d3ea6032badce7cacb919d6614cc6", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.5.zip": "11b30528eb0dafc8bc1a5ba39d81277c257cbe6946a7564402f588357c164560", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/stress/com_github_cockroachdb_stress-v0.0.0-20220803192808-1806698b1b7b.zip": "3fda531795c600daf25532a4f98be2a1335cd1e5e182c72789bca79f5f69fcc1", diff --git a/go.mod b/go.mod index 86199c4bfdc2..9546168b17d2 100644 --- a/go.mod +++ b/go.mod @@ -124,7 +124,7 @@ require ( github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 github.com/cockroachdb/gostdlib v1.19.0 github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b - github.com/cockroachdb/pebble v0.0.0-20240308204553-8df4320c24e4 + github.com/cockroachdb/pebble v0.0.0-20240312180812-51faab0a3555 github.com/cockroachdb/redact v1.1.5 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b diff --git a/go.sum b/go.sum index a31b22d8cb41..852e31e87f2f 100644 --- a/go.sum +++ b/go.sum @@ -508,8 +508,8 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA= -github.com/cockroachdb/pebble v0.0.0-20240308204553-8df4320c24e4 h1:yuJAmwkJTQjB5YyoNkmXlK9/2YR+jWizfE7crErqGhU= -github.com/cockroachdb/pebble v0.0.0-20240308204553-8df4320c24e4/go.mod h1:g0agBmtwky6biPBw0MO+GkiYRv9krOTZgpPw2rfha8c= +github.com/cockroachdb/pebble v0.0.0-20240312180812-51faab0a3555 h1:NRPlms/+HfNgTPMrvSTU/bKDDps4/6vSvPnogZ4HzYw= +github.com/cockroachdb/pebble v0.0.0-20240312180812-51faab0a3555/go.mod h1:g0agBmtwky6biPBw0MO+GkiYRv9krOTZgpPw2rfha8c= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index bb20648c59db..6ef7bc31db92 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -244,6 +244,7 @@ func backup( backupManifest.StartTime, backupManifest.EndTime, backupManifest.ElidedPrefix, + backupManifest.ClusterVersion.AtLeast(clusterversion.V24_1.Version()), ) if err != nil { return roachpb.RowCount{}, 0, err diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index f44c604b91a1..7eba8ba4dc70 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -487,14 +487,14 @@ func runBackupProcessor( if !span.firstKeyTS.IsEmpty() { splitMidKey = true } - req := &kvpb.ExportRequest{ - RequestHeader: kvpb.RequestHeaderFromSpan(span.span), - ResumeKeyTS: span.firstKeyTS, - StartTime: span.start, - MVCCFilter: spec.MVCCFilter, - TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), - SplitMidKey: splitMidKey, + RequestHeader: kvpb.RequestHeaderFromSpan(span.span), + ResumeKeyTS: span.firstKeyTS, + StartTime: span.start, + MVCCFilter: spec.MVCCFilter, + TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), + SplitMidKey: splitMidKey, + IncludeMVCCValueHeader: spec.IncludeMVCCValueHeader, } // If we're doing re-attempts but are not yet in the priority regime, diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 9edcccb37f59..91b48f9b005b 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -46,6 +46,7 @@ func distBackupPlanSpecs( mvccFilter kvpb.MVCCFilter, startTime, endTime hlc.Timestamp, elide execinfrapb.ElidePrefix, + includeValueHeader bool, ) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) { var span *tracing.Span ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs") @@ -98,17 +99,18 @@ func distBackupPlanSpecs( sqlInstanceIDToSpec := make(map[base.SQLInstanceID]*execinfrapb.BackupDataSpec) for _, partition := range spanPartitions { spec := &execinfrapb.BackupDataSpec{ - JobID: jobID, - Spans: partition.Spans, - DefaultURI: defaultURI, - URIsByLocalityKV: urisByLocalityKV, - MVCCFilter: mvccFilter, - Encryption: fileEncryption, - PKIDs: pkIDs, - BackupStartTime: startTime, - BackupEndTime: endTime, - UserProto: user.EncodeProto(), - ElidePrefix: elide, + JobID: jobID, + Spans: partition.Spans, + DefaultURI: defaultURI, + URIsByLocalityKV: urisByLocalityKV, + MVCCFilter: mvccFilter, + Encryption: fileEncryption, + PKIDs: pkIDs, + BackupStartTime: startTime, + BackupEndTime: endTime, + UserProto: user.EncodeProto(), + ElidePrefix: elide, + IncludeMVCCValueHeader: includeValueHeader, } sqlInstanceIDToSpec[partition.SQLInstanceID] = spec } @@ -121,16 +123,17 @@ func distBackupPlanSpecs( // which is not the leaseholder for any of the spans, but is for an // introduced span. spec := &execinfrapb.BackupDataSpec{ - JobID: jobID, - IntroducedSpans: partition.Spans, - DefaultURI: defaultURI, - URIsByLocalityKV: urisByLocalityKV, - MVCCFilter: mvccFilter, - Encryption: fileEncryption, - PKIDs: pkIDs, - BackupStartTime: startTime, - BackupEndTime: endTime, - UserProto: user.EncodeProto(), + JobID: jobID, + IntroducedSpans: partition.Spans, + DefaultURI: defaultURI, + URIsByLocalityKV: urisByLocalityKV, + MVCCFilter: mvccFilter, + Encryption: fileEncryption, + PKIDs: pkIDs, + BackupStartTime: startTime, + BackupEndTime: endTime, + UserProto: user.EncodeProto(), + IncludeMVCCValueHeader: includeValueHeader, } sqlInstanceIDToSpec[partition.SQLInstanceID] = spec } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index e2ecf7d3ea4e..baa7d7a19ffa 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -554,7 +553,10 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( return summary, err } valueScratch = append(valueScratch[:0], v...) - value := roachpb.Value{RawBytes: valueScratch} + value, err := storage.DecodeValueFromMVCCValue(valueScratch) + if err != nil { + return summary, err + } key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime) @@ -581,7 +583,14 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( if verbose { log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint()) } - if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil { + + // Using valueScratch here assumes that + // DecodeValueFromMVCCValue, ClearChecksum, and + // InitChecksum don't copy/reallocate the slice they + // were given. We expect that value.ClearChecksum and + // value.InitChecksum calls above have modified + // valueScratch. + if err := batcher.AddMVCCKey(ctx, key, valueScratch); err != nil { return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint()) } } diff --git a/pkg/cmd/cockroach-oss/BUILD.bazel b/pkg/cmd/cockroach-oss/BUILD.bazel index 755eca7271ab..731a7b5a6f75 100644 --- a/pkg/cmd/cockroach-oss/BUILD.bazel +++ b/pkg/cmd/cockroach-oss/BUILD.bazel @@ -15,6 +15,7 @@ go_library( go_binary( name = "cockroach-oss", embed = [":cockroach-oss_lib"], + exec_properties = {"Pool": "large"}, visibility = ["//visibility:public"], ) diff --git a/pkg/cmd/cockroach-short/BUILD.bazel b/pkg/cmd/cockroach-short/BUILD.bazel index 919d53a63866..2f89c15412a2 100644 --- a/pkg/cmd/cockroach-short/BUILD.bazel +++ b/pkg/cmd/cockroach-short/BUILD.bazel @@ -14,5 +14,6 @@ go_library( go_binary( name = "cockroach-short", embed = [":cockroach-short_lib"], + exec_properties = {"Pool": "large"}, visibility = ["//visibility:public"], ) diff --git a/pkg/cmd/cockroach-sql/BUILD.bazel b/pkg/cmd/cockroach-sql/BUILD.bazel index 94a2d2db1497..1808ee752cfa 100644 --- a/pkg/cmd/cockroach-sql/BUILD.bazel +++ b/pkg/cmd/cockroach-sql/BUILD.bazel @@ -26,5 +26,6 @@ go_library( go_binary( name = "cockroach-sql", embed = [":cockroach-sql_lib"], + exec_properties = {"Pool": "large"}, visibility = ["//visibility:public"], ) diff --git a/pkg/cmd/cockroach/BUILD.bazel b/pkg/cmd/cockroach/BUILD.bazel index 74d8fba934aa..b6da4a42daf9 100644 --- a/pkg/cmd/cockroach/BUILD.bazel +++ b/pkg/cmd/cockroach/BUILD.bazel @@ -25,5 +25,6 @@ disallowed_imports_test( go_binary( name = "cockroach", embed = [":cockroach_lib"], + exec_properties = {"Pool": "large"}, visibility = ["//visibility:public"], ) diff --git a/pkg/cmd/roachprod/BUILD.bazel b/pkg/cmd/roachprod/BUILD.bazel index 5440091cad0a..539eeccab280 100644 --- a/pkg/cmd/roachprod/BUILD.bazel +++ b/pkg/cmd/roachprod/BUILD.bazel @@ -35,5 +35,6 @@ go_library( go_binary( name = "roachprod", embed = [":roachprod_lib"], + exec_properties = {"Pool": "large"}, visibility = ["//visibility:public"], ) diff --git a/pkg/cmd/roachtest/tests/admission_control_elastic_io.go b/pkg/cmd/roachtest/tests/admission_control_elastic_io.go index cdf0a98fb9ae..447ec5390ca2 100644 --- a/pkg/cmd/roachtest/tests/admission_control_elastic_io.go +++ b/pkg/cmd/roachtest/tests/admission_control_elastic_io.go @@ -122,16 +122,24 @@ func registerElasticIO(r registry.Registry) { // not working, the threshold of 7 will be easily breached, since // regular tokens allow sub-levels to exceed 10. const subLevelThreshold = 7 + const sampleCountForL0Sublevel = 12 + var l0SublevelCount []float64 // Sleep initially for stability to be achieved, before measuring. time.Sleep(5 * time.Minute) for { - time.Sleep(30 * time.Second) + time.Sleep(10 * time.Second) val, err := getMetricVal(subLevelMetric) if err != nil { continue } - if val > subLevelThreshold { - t.Fatalf("sub-level count %f exceeded threshold", val) + l0SublevelCount = append(l0SublevelCount, val) + // We want to use the mean of the last 2m of data to avoid short-lived + // spikes causing failures. + if len(l0SublevelCount) >= sampleCountForL0Sublevel { + latestSampleMeanL0Sublevels := getMeanOverLastN(sampleCountForL0Sublevel, l0SublevelCount) + if latestSampleMeanL0Sublevels > subLevelThreshold { + t.Fatalf("sub-level mean %f over last %d iterations exceeded threshold", latestSampleMeanL0Sublevels, sampleCountForL0Sublevel) + } } if timeutil.Now().After(endTime) { return nil diff --git a/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go b/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go index a74aaa3f0a96..16de58b648aa 100644 --- a/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go +++ b/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go @@ -174,19 +174,3 @@ func registerIntentResolutionOverload(r registry.Registry) { }, }) } - -// Returns the mean over the last n samples. If n > len(items), returns the mean -// over the entire items slice. -func getMeanOverLastN(n int, items []float64) float64 { - count := n - if len(items) < n { - count = len(items) - } - sum := float64(0) - i := 0 - for i < count { - sum += items[len(items)-1-i] - i++ - } - return sum / float64(count) -} diff --git a/pkg/cmd/roachtest/tests/split.go b/pkg/cmd/roachtest/tests/split.go index 58eaa13e3163..88d54a80747c 100644 --- a/pkg/cmd/roachtest/tests/split.go +++ b/pkg/cmd/roachtest/tests/split.go @@ -409,7 +409,7 @@ func registerLoadSplits(r registry.Registry) { // YCSB/E has a zipfian distribution with 95% scans (limit 1k) and 5% // inserts. minimumRanges: 5, - maximumRanges: 15, + maximumRanges: 18, initialRangeCount: 2, load: ycsbSplitLoad{ workload: "e", diff --git a/pkg/cmd/roachtest/tests/util.go b/pkg/cmd/roachtest/tests/util.go index 85c5b9102978..d65ed33c3564 100644 --- a/pkg/cmd/roachtest/tests/util.go +++ b/pkg/cmd/roachtest/tests/util.go @@ -232,3 +232,19 @@ func maybeUseMemoryBudget(t test.Test, budget int) option.StartOpts { } return startOpts } + +// Returns the mean over the last n samples. If n > len(items), returns the mean +// over the entire items slice. +func getMeanOverLastN(n int, items []float64) float64 { + count := n + if len(items) < n { + count = len(items) + } + sum := float64(0) + i := 0 + for i < count { + sum += items[len(items)-1-i] + i++ + } + return sum / float64(count) +} diff --git a/pkg/kv/bulk/buffering_adder.go b/pkg/kv/bulk/buffering_adder.go index 3de6446cdb60..ef585396b84a 100644 --- a/pkg/kv/bulk/buffering_adder.go +++ b/pkg/kv/bulk/buffering_adder.go @@ -63,6 +63,12 @@ type BufferingAdder struct { // name of the BufferingAdder for the purpose of logging only. name string + // importEpoch specifies the ImportEpoch of the table the BufferingAdder + // is ingesting data as part of an IMPORT INTO job. If specified, the Bulk + // Adder's SSTBatcher will write the import epoch to each versioned value's + // metadata. + importEpoch uint32 + bulkMon *mon.BytesMonitor memAcc mon.BoundAccount @@ -96,7 +102,8 @@ func MakeBulkAdder( } b := &BufferingAdder{ - name: opts.Name, + name: opts.Name, + importEpoch: opts.ImportEpoch, sink: SSTBatcher{ name: opts.Name, db: db, @@ -303,8 +310,15 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { for i := range b.curBuf.entries { mvccKey.Key = b.curBuf.Key(i) - if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil { - return err + if b.importEpoch != 0 { + if err := b.sink.AddMVCCKeyWithImportEpoch(ctx, mvccKey, b.curBuf.Value(i), + b.importEpoch); err != nil { + return err + } + } else { + if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil { + return err + } } } if err := b.sink.Flush(ctx); err != nil { diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 51dd1dc4bb23..e19ec6b05a03 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -329,6 +329,21 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) { b.mu.onFlush = onFlush } +func (b *SSTBatcher) AddMVCCKeyWithImportEpoch( + ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32, +) error { + mvccVal, err := storage.DecodeMVCCValue(value) + if err != nil { + return err + } + mvccVal.MVCCValueHeader.ImportEpoch = importEpoch + encVal, err := storage.EncodeMVCCValue(mvccVal) + if err != nil { + return err + } + return b.AddMVCCKey(ctx, key, encVal) +} + // AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed). // This is only for callers that want to control the timestamp on individual // keys -- like RESTORE where we want the restored data to look like the backup. @@ -389,8 +404,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value if !b.disallowShadowingBelow.IsEmpty() { b.updateMVCCStats(key, value) } - - return b.sstWriter.Put(key, value) + return b.sstWriter.PutRawMVCC(key, value) } // Reset clears all state in the batcher and prepares it for reuse. diff --git a/pkg/kv/bulk/sst_batcher_test.go b/pkg/kv/bulk/sst_batcher_test.go index 4912a4bbe3e3..85082fdfddd0 100644 --- a/pkg/kv/bulk/sst_batcher_test.go +++ b/pkg/kv/bulk/sst_batcher_test.go @@ -308,7 +308,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) { mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil) reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000) b, err := bulk.MakeBulkAdder( - ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs, + ctx, kvDB, mockCache, s.ClusterSettings(), ts, + kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs, ) require.NoError(t, err) @@ -361,3 +362,77 @@ func runTestImport(t *testing.T, batchSizeValue int64) { }) } } + +var DummyImportEpoch uint32 = 3 + +func TestImportEpochIngestion(t *testing.T) { + defer leaktest.AfterTest(t)() + + defer log.Scope(t).Close(t) + ctx := context.Background() + + mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil) + reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000) + s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + b, err := bulk.MakeTestingSSTBatcher(ctx, kvDB, s.ClusterSettings(), + false, true, mem.MakeConcurrentBoundAccount(), reqs) + require.NoError(t, err) + defer b.Close(ctx) + + startKey := storageutils.PointKey("a", 1) + endKey := storageutils.PointKey("b", 1) + value := storageutils.StringValueRaw("myHumbleValue") + mvccValue, err := storage.DecodeMVCCValue(value) + require.NoError(t, err) + + require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, startKey, value, DummyImportEpoch)) + require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, endKey, value, DummyImportEpoch)) + require.NoError(t, b.Flush(ctx)) + + // Check that ingested key contains the dummy job ID + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: startKey.Key, + EndKey: endKey.Key, + }, + MVCCFilter: kvpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + IncludeMVCCValueHeader: true, + } + + header := kvpb.Header{Timestamp: s.Clock().Now()} + resp, roachErr := kv.SendWrappedWith(ctx, + kvDB.NonTransactionalSender(), header, req) + require.NoError(t, roachErr.GoError()) + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: startKey.Key, + UpperBound: endKey.Key, + } + + checkedJobId := false + for _, file := range resp.(*kvpb.ExportResponse).Files { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts) + require.NoError(t, err) + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + break + } + rawVal, err := it.UnsafeValue() + require.NoError(t, err) + val, err := storage.DecodeMVCCValue(rawVal) + require.NoError(t, err) + require.Equal(t, startKey, it.UnsafeKey()) + require.Equal(t, mvccValue.Value, val.Value) + require.Equal(t, DummyImportEpoch, val.ImportEpoch) + require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp) + checkedJobId = true + } + } + require.Equal(t, true, checkedJobId) +} diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 75a5b8031ede..4fcedd9f5dab 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -1831,7 +1831,14 @@ message ExportRequest { FingerprintOptions fingerprint_options = 15 [(gogoproto.nullable) = false]; - // Next ID: 16 + // IncludeMVCCValueHeader controls whether the MVCCValueHeader is + // included in exported bytes. Callers should only set this when all + // readers of the returned SST are prepared to parse full a + // MVCCValue. Even when set, only fields appropriate for export are + // included. See storage.EncodeMVCCValueForExport for details. + bool include_mvcc_value_header = 16 [(gogoproto.customname) = "IncludeMVCCValueHeader"]; + + // Next ID: 17 } message FingerprintOptions { diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index 5aceb0e9a83e..5701ec0577be 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -550,7 +550,7 @@ func EvalAddSSTable( // * Only SST set operations (not explicitly verified). // * No intents or unversioned values. // * If sstTimestamp is set, all MVCC timestamps equal it. -// * MVCCValueHeader is empty. +// * The LocalTimestamp in the MVCCValueHeader is empty. // * Given MVCC stats match the SST contents. func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.MVCCStats) error { @@ -584,8 +584,9 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M return errors.NewAssertionErrorWithWrappedErrf(err, "SST contains invalid value for key %s", key) } - if value.MVCCValueHeader != (enginepb.MVCCValueHeader{}) { - return errors.AssertionFailedf("SST contains non-empty MVCC value header for key %s", key) + if !value.MVCCValueHeader.LocalTimestamp.IsEmpty() { + return errors.AssertionFailedf("SST contains non-empty Local Timestamp in the MVCC value"+ + " header for key %s", key) } } diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 0dbf41fae425..d871f9d8b157 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -144,7 +144,7 @@ func TestEvalAddSSTable(t *testing.T) { sst: kvs{pointKVWithLocalTS("a", 2, 1, "a2")}, expect: kvs{pointKVWithLocalTS("a", 2, 1, "a2")}, expectStatsEst: true, - expectErrRace: `SST contains non-empty MVCC value header for key "a"/2.000000000,0`, + expectErrRace: `SST contains non-empty Local Timestamp in the MVCC value header for key "a"/2.000000000,0`, }, "blind rejects local timestamp on range key under race only": { // unfortunately, for performance sst: kvs{rangeKVWithLocalTS("a", "d", 2, 1, "")}, @@ -297,7 +297,7 @@ func TestEvalAddSSTable(t *testing.T) { sst: kvs{pointKVWithLocalTS("a", 2, 1, "a2")}, expect: kvs{pointKVWithLocalTS("a", 10, 1, "a2")}, expectStatsEst: true, - expectErrRace: `SST contains non-empty MVCC value header for key "a"/2.000000000,0`, + expectErrRace: `SST contains non-empty Local Timestamp in the MVCC value header for key "a"/2.000000000,0`, }, "SSTTimestampToRequestTimestamp with DisallowConflicts causes estimated stats with range key masking": { reqTS: 5, diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 6a1705004a6d..dea1d60e7487 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -197,6 +197,7 @@ func evalExport( TargetLockConflictBytes: targetLockConflictBytes, StopMidKey: args.SplitMidKey, ScanStats: cArgs.ScanStats, + IncludeMVCCValueHeader: args.IncludeMVCCValueHeader, } var summary kvpb.BulkOpSummary var resumeInfo storage.ExportRequestResumeInfo diff --git a/pkg/kv/kvserver/kvserverbase/bulk_adder.go b/pkg/kv/kvserver/kvserverbase/bulk_adder.go index 62323091a093..601eb1a3bc26 100644 --- a/pkg/kv/kvserver/kvserverbase/bulk_adder.go +++ b/pkg/kv/kvserver/kvserverbase/bulk_adder.go @@ -69,6 +69,15 @@ type BulkAdderOptions struct { // the first buffer to pick split points in the hope it is a representative // sample of the overall input. InitialSplitsIfUnordered int + + // ImportEpoch specifies the ImportEpoch of the table the BulkAdder + // is ingesting data into as part of an IMPORT INTO job. If specified, the Bulk + // Adder's SSTBatcher will write the import epoch to each versioned value's + // metadata. + // + // Callers should check that the cluster is at or above + // version 24.1 before setting this option. + ImportEpoch uint32 } // BulkAdderFactory describes a factory function for BulkAdders. diff --git a/pkg/multitenant/tenant_config.go b/pkg/multitenant/tenant_config.go index 62795c08fdbb..51160d084a75 100644 --- a/pkg/multitenant/tenant_config.go +++ b/pkg/multitenant/tenant_config.go @@ -32,17 +32,6 @@ var DefaultTenantSelect = settings.RegisterStringSetting( settings.WithName(DefaultClusterSelectSettingName), ) -// VerifyTenantService determines whether there should be an advisory -// interlock between changes to the tenant service and changes to the -// above cluster setting. -var VerifyTenantService = settings.RegisterBoolSetting( - settings.SystemOnly, - "server.controller.default_tenant.check_service.enabled", - "verify that the service mode is coherently set with the value of "+DefaultClusterSelectSettingName, - true, - settings.WithName(DefaultClusterSelectSettingName+".check_service.enabled"), -) - // WaitForClusterStartTimeout is the amount of time the tenant // controller will wait for the default virtual cluster to have an // active SQL server. diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 76fc373cb001..35a694b259bc 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -229,17 +229,18 @@ var retiredSettings = map[InternalKey]struct{}{ "bulkio.restore.remove_regions.enabled": {}, // removed as of 24.1 - "storage.mvcc.range_tombstones.enabled": {}, - "changefeed.balance_range_distribution.enable": {}, - "changefeed.mux_rangefeed.enabled": {}, - "kv.rangefeed.catchup_scan_concurrency": {}, - "kv.rangefeed.scheduler.enabled": {}, - "physical_replication.producer.mux_rangefeeds.enabled": {}, - "kv.rangefeed.use_dedicated_connection_class.enabled": {}, - "sql.trace.session_eventlog.enabled": {}, - "sql.show_ranges_deprecated_behavior.enabled": {}, - "sql.drop_virtual_cluster.enabled": {}, - "cross_cluster_replication.enabled": {}, + "storage.mvcc.range_tombstones.enabled": {}, + "changefeed.balance_range_distribution.enable": {}, + "changefeed.mux_rangefeed.enabled": {}, + "kv.rangefeed.catchup_scan_concurrency": {}, + "kv.rangefeed.scheduler.enabled": {}, + "physical_replication.producer.mux_rangefeeds.enabled": {}, + "kv.rangefeed.use_dedicated_connection_class.enabled": {}, + "sql.trace.session_eventlog.enabled": {}, + "sql.show_ranges_deprecated_behavior.enabled": {}, + "sql.drop_virtual_cluster.enabled": {}, + "cross_cluster_replication.enabled": {}, + "server.controller.default_tenant.check_service.enabled": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults diff --git a/pkg/sql/backfill/mvcc_index_merger.go b/pkg/sql/backfill/mvcc_index_merger.go index 81ee80a3a87b..591fcca37447 100644 --- a/pkg/sql/backfill/mvcc_index_merger.go +++ b/pkg/sql/backfill/mvcc_index_merger.go @@ -103,6 +103,10 @@ const indexBackfillMergeProgressReportInterval = 10 * time.Second // Run runs the processor. func (ibm *IndexBackfillMerger) Run(ctx context.Context, output execinfra.RowReceiver) { + opName := "IndexBackfillMerger" + ctx = logtags.AddTag(ctx, opName, int(ibm.spec.Table.ID)) + ctx, span := execinfra.ProcessorSpan(ctx, ibm.flowCtx, opName, ibm.processorID) + defer span.Finish() // This method blocks until all worker goroutines exit, so it's safe to // close memory monitoring infra in defers. mergerMon := execinfra.NewMonitor(ctx, ibm.flowCtx.Cfg.BackfillerMonitor, "index-backfiller-merger-mon") @@ -113,10 +117,6 @@ func (ibm *IndexBackfillMerger) Run(ctx context.Context, output execinfra.RowRec defer ibm.muBoundAccount.Unlock() ibm.muBoundAccount.boundAccount.Close(ctx) }() - opName := "IndexBackfillMerger" - ctx = logtags.AddTag(ctx, opName, int(ibm.spec.Table.ID)) - ctx, span := execinfra.ProcessorSpan(ctx, ibm.flowCtx, opName, ibm.processorID) - defer span.Finish() defer output.ProducerDone() defer execinfra.SendTraceData(ctx, ibm.flowCtx, output) diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 5073bc3d99ce..ad8eb0bd48c6 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -318,7 +318,14 @@ message BackupDataSpec { optional string user_proto = 10 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; optional ElidePrefix elide_prefix = 12 [(gogoproto.nullable) = false]; - // NEXTID: 13. + + // IncludeMVCCValueHeader indicates whether the backup should be + // created with MVCCValueHeaders in the exported data. This should + // only be set on backups starting on cluster version 24.1 or + // greater. + optional bool include_mvcc_value_header = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IncludeMVCCValueHeader"]; + + // NEXTID: 14. } message RestoreFileSpec { diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 323e5d8acb2e..6a8008f0e59a 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -148,6 +148,7 @@ go_test( "exportparquet_test.go", "import_csv_mark_redaction_test.go", "import_into_test.go", + "import_mvcc_test.go", "import_processor_test.go", "import_stmt_test.go", "main_test.go", @@ -187,6 +188,7 @@ go_test( "//pkg/keys", "//pkg/keyvisualizer", "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvpb", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", @@ -229,6 +231,7 @@ go_test( "//pkg/sql/stats", "//pkg/sql/tests", "//pkg/sql/types", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/jobutils", diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index 30fedbe7936a..f7e8cda2259c 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -292,6 +292,14 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { } } + if len(details.Tables) > 1 { + for _, tab := range details.Tables { + if !tab.IsNew { + return errors.AssertionFailedf("all tables in multi-table import must be new") + } + } + } + procsPerNode := int(processorsPerNode.Get(&p.ExecCfg().Settings.SV)) res, err := ingestWithRetry(ctx, p, r.job, tables, typeDescs, files, format, details.Walltime, @@ -400,9 +408,11 @@ func (r *importResumer) prepareTablesForIngestion( var err error var newTableDescs []jobspb.ImportDetails_Table var desc *descpb.TableDescriptor + + useImportEpochs := p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V24_1) for i, table := range details.Tables { if !table.IsNew { - desc, err = prepareExistingTablesForIngestion(ctx, txn, descsCol, table.Desc) + desc, err = prepareExistingTablesForIngestion(ctx, txn, descsCol, table.Desc, useImportEpochs) if err != nil { return importDetails, err } @@ -480,7 +490,11 @@ func (r *importResumer) prepareTablesForIngestion( // prepareExistingTablesForIngestion prepares descriptors for existing tables // being imported into. func prepareExistingTablesForIngestion( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, desc *descpb.TableDescriptor, + ctx context.Context, + txn *kv.Txn, + descsCol *descs.Collection, + desc *descpb.TableDescriptor, + useImportEpochs bool, ) (*descpb.TableDescriptor, error) { if len(desc.Mutations) > 0 { return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String()) @@ -500,7 +514,14 @@ func prepareExistingTablesForIngestion( // Take the table offline for import. // TODO(dt): audit everywhere we get table descs (leases or otherwise) to // ensure that filtering by state handles IMPORTING correctly. - importing.OfflineForImport() + + // We only use the new OfflineForImport on 24.1, which bumps + // the ImportEpoch, if we are completely on 24.1. + if useImportEpochs { + importing.OfflineForImport() + } else { + importing.SetOffline(tabledesc.OfflineReasonImporting) + } // TODO(dt): de-validate all the FKs. if err := descsCol.WriteDesc( diff --git a/pkg/sql/importer/import_mvcc_test.go b/pkg/sql/importer/import_mvcc_test.go new file mode 100644 index 000000000000..f1225b10576a --- /dev/null +++ b/pkg/sql/importer/import_mvcc_test.go @@ -0,0 +1,116 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package importer_test + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestMVCCValueHeaderImportEpoch tests that the import job ID is properly +// stored in the MVCCValueHeader in an imported key's MVCCValue. +func TestMVCCValueHeaderImportEpoch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + server, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s := server.ApplicationLayer() + defer server.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(db) + + sqlDB.Exec(t, `CREATE DATABASE d`) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + fmt.Fprint(w, "1") + } + })) + defer srv.Close() + + // Create a table where the first row ( in sort order) comes from an IMPORT + // while the second comes from an INSERT. + sqlDB.Exec(t, `CREATE TABLE d.t (a INT8)`) + sqlDB.Exec(t, `INSERT INTO d.t VALUES ('2')`) + sqlDB.Exec(t, `IMPORT INTO d.t CSV DATA ($1)`, srv.URL) + + // Conduct an export request to iterate over the keys in the table. + var tableID uint32 + sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`, + "t").Scan(&tableID) + + startKey := s.Codec().TablePrefix(tableID) + endKey := startKey.PrefixEnd() + + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: startKey, + EndKey: endKey, + }, + MVCCFilter: kvpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + IncludeMVCCValueHeader: true, + } + + header := kvpb.Header{Timestamp: s.Clock().Now()} + resp, roachErr := kv.SendWrappedWith(ctx, + s.DistSenderI().(*kvcoord.DistSender), header, req) + require.NoError(t, roachErr.GoError()) + + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: startKey, + UpperBound: endKey, + } + + // Ensure there are 2 keys in the span, and only the first one contains job ID metadata + keyCount := 0 + for _, file := range resp.(*kvpb.ExportResponse).Files { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts) + require.NoError(t, err) + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + break + } + rawVal, err := it.UnsafeValue() + require.NoError(t, err) + val, err := storage.DecodeMVCCValue(rawVal) + require.NoError(t, err) + if keyCount == 0 { + require.NotEqual(t, uint32(0), val.ImportEpoch) + } else if keyCount == 1 { + require.Equal(t, uint32(0), val.ImportEpoch) + } else { + t.Fatal("more than 2 keys in the table") + } + require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp) + keyCount++ + } + } +} diff --git a/pkg/sql/importer/import_processor.go b/pkg/sql/importer/import_processor.go index ef79797eac16..f49441738cad 100644 --- a/pkg/sql/importer/import_processor.go +++ b/pkg/sql/importer/import_processor.go @@ -389,6 +389,16 @@ func ingestKvs( // will hog memory as it tries to grow more aggressively. minBufferSize, maxBufferSize := importBufferConfigSizes(flowCtx.Cfg.Settings, true /* isPKAdder */) + + var bulkAdderImportEpoch uint32 + for _, v := range spec.Tables { + if bulkAdderImportEpoch == 0 { + bulkAdderImportEpoch = v.Desc.ImportEpoch + } else if bulkAdderImportEpoch != v.Desc.ImportEpoch { + return nil, errors.AssertionFailedf("inconsistent import epoch on multi-table import") + } + } + pkIndexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB.KV(), writeTS, kvserverbase.BulkAdderOptions{ Name: pkAdderName, DisallowShadowingBelow: writeTS, @@ -397,6 +407,7 @@ func ingestKvs( MaxBufferSize: maxBufferSize, InitialSplitsIfUnordered: int(spec.InitialSplits), WriteAtBatchTimestamp: true, + ImportEpoch: bulkAdderImportEpoch, }) if err != nil { return nil, err @@ -413,6 +424,7 @@ func ingestKvs( MaxBufferSize: maxBufferSize, InitialSplitsIfUnordered: int(spec.InitialSplits), WriteAtBatchTimestamp: true, + ImportEpoch: bulkAdderImportEpoch, }) if err != nil { return nil, err diff --git a/pkg/sql/importer/import_processor_test.go b/pkg/sql/importer/import_processor_test.go index 3a959327ae6f..a1ecc42be252 100644 --- a/pkg/sql/importer/import_processor_test.go +++ b/pkg/sql/importer/import_processor_test.go @@ -243,7 +243,7 @@ func TestImportIgnoresProcessedFiles(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( @@ -368,7 +368,7 @@ func TestImportHonorsResumePosition(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( @@ -502,7 +502,7 @@ func TestImportHandlesDuplicateKVs(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( diff --git a/pkg/sql/logictest/testdata/logic_test/lookup_join b/pkg/sql/logictest/testdata/logic_test/lookup_join index 6a2cf518ac4f..6b95e90bd70c 100644 --- a/pkg/sql/logictest/testdata/logic_test/lookup_join +++ b/pkg/sql/logictest/testdata/logic_test/lookup_join @@ -1,3 +1,5 @@ +skip under race + statement ok CREATE TABLE abc (a INT, b INT, c INT, PRIMARY KEY (a, c)); INSERT INTO abc VALUES (1, 1, 2), (2, 1, 1), (2, NULL, 2) diff --git a/pkg/sql/logictest/testdata/logic_test/stats b/pkg/sql/logictest/testdata/logic_test/stats index 6d1fb6385b3b..51bf5bfbbce6 100644 --- a/pkg/sql/logictest/testdata/logic_test/stats +++ b/pkg/sql/logictest/testdata/logic_test/stats @@ -1,5 +1,7 @@ # LogicTest: !fakedist-disk +skip under race + # Note that we disable the "forced disk spilling" config because the histograms # are dropped if the stats collection reaches the memory budget limit. diff --git a/pkg/sql/logictest/testdata/logic_test/tenant b/pkg/sql/logictest/testdata/logic_test/tenant index a304c3c649d7..a9375987c122 100644 --- a/pkg/sql/logictest/testdata/logic_test/tenant +++ b/pkg/sql/logictest/testdata/logic_test/tenant @@ -514,21 +514,9 @@ subtest regression_105115 statement ok CREATE TENANT noservice -statement error shared service not enabled for tenant "noservice" -SET CLUSTER SETTING server.controller.default_target_cluster = noservice - -statement ok -SET CLUSTER SETTING server.controller.default_target_cluster.check_service.enabled = false - statement ok SET CLUSTER SETTING server.controller.default_target_cluster = noservice -statement ok -RESET CLUSTER SETTING server.controller.default_target_cluster.check_service.enabled - -statement ok -RESET CLUSTER SETTING server.controller.default_target_cluster - statement ok DROP TENANT noservice; CREATE TENANT withservice; @@ -539,19 +527,10 @@ ALTER TENANT withservice START SERVICE SHARED statement ok SET CLUSTER SETTING server.controller.default_target_cluster = withservice -statement error cannot stop service while tenant is selected as default -ALTER TENANT withservice STOP SERVICE - -statement ok -SET CLUSTER SETTING server.controller.default_target_cluster.check_service.enabled = false - statement ok ALTER TENANT withservice STOP SERVICE # clean up -statement ok -RESET CLUSTER SETTING server.controller.default_target_cluster.check_service.enabled - statement ok RESET CLUSTER SETTING server.controller.default_target_cluster diff --git a/pkg/sql/logictest/testdata/logic_test/upsert b/pkg/sql/logictest/testdata/logic_test/upsert index 34567404fc27..8c318e2a7ed5 100644 --- a/pkg/sql/logictest/testdata/logic_test/upsert +++ b/pkg/sql/logictest/testdata/logic_test/upsert @@ -1,3 +1,5 @@ +skip under race + subtest strict statement ok diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_unsupported b/pkg/sql/logictest/testdata/logic_test/vectorize_unsupported index d654a808e453..048b9285e9c5 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_unsupported +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_unsupported @@ -1,3 +1,5 @@ +skip under race + statement ok CREATE TABLE a (a INT, b INT, c INT4, PRIMARY KEY (a, b)); INSERT INTO a SELECT g//2, g, g FROM generate_series(0,2000) g(g) diff --git a/pkg/sql/logictest/testdata/logic_test/window b/pkg/sql/logictest/testdata/logic_test/window index 035a490879a3..6a8b3e318f62 100644 --- a/pkg/sql/logictest/testdata/logic_test/window +++ b/pkg/sql/logictest/testdata/logic_test/window @@ -1,3 +1,5 @@ +skip under race + statement ok CREATE TABLE kv ( -- don't add column "a" diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index 8834f695b947..6a6a6d5f5e5e 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -22,8 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/multitenant" - "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" @@ -358,17 +356,6 @@ func (n *setClusterSettingNode) startExec(params runParams) error { // Report tracked cluster settings via telemetry. // TODO(justin): implement a more general mechanism for tracking these. switch n.name { - case multitenant.DefaultClusterSelectSettingName: - if multitenant.VerifyTenantService.Get(&n.st.SV) && expectedEncodedValue != "" { - tr, err := GetTenantRecordByName(params.ctx, n.st, params.p.InternalSQLTxn(), roachpb.TenantName(expectedEncodedValue)) - if err != nil { - return errors.Wrapf(err, "failed to lookup tenant %q", expectedEncodedValue) - } - if tr.ServiceMode != mtinfopb.ServiceModeShared { - return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, - "shared service not enabled for tenant %q", expectedEncodedValue) - } - } case catpb.AutoStatsEnabledSettingName: switch expectedEncodedValue { case "true": diff --git a/pkg/sql/tenant_update.go b/pkg/sql/tenant_update.go index aa8886950c9c..f3b6597b9fc5 100644 --- a/pkg/sql/tenant_update.go +++ b/pkg/sql/tenant_update.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -110,23 +109,6 @@ func validateTenantInfo( info.ServiceMode, info.DataState) } - // Sanity check. Note that this interlock is not a guarantee that - // the cluster setting will never be set to an invalid tenant. There - // is a race condition between changing the cluster setting and the - // check here. Generally, other subsystems should always tolerate - // when the cluster setting is set to a tenant without service (or - // even one that doesn't exist). - if multitenant.VerifyTenantService.Get(&settings.SV) && - info.ServiceMode == mtinfopb.ServiceModeNone && - info.Name != "" && - multitenant.DefaultTenantSelect.Get(&settings.SV) == string(info.Name) { - return errors.WithHintf( - pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, - "cannot stop service while tenant is selected as default"), - "Update the cluster setting %q to a different value.", - multitenant.DefaultClusterSelectSettingName) - } - return nil } diff --git a/pkg/storage/enginepb/mvcc3.proto b/pkg/storage/enginepb/mvcc3.proto index a17cf5f6ba80..0c3baaacbbc7 100644 --- a/pkg/storage/enginepb/mvcc3.proto +++ b/pkg/storage/enginepb/mvcc3.proto @@ -198,6 +198,10 @@ message MVCCValueHeader { // not be available in changefeeds. This allows higher levels of the system to // control which writes are exported. bool omit_in_rangefeeds = 3; + + // ImportEpoch identifies the number of times a user has called IMPORT + // INTO on the table this key belongs to when the table was not empty. + uint32 import_epoch = 4; } // MVCCValueHeaderPure is not to be used directly. It's generated only for use of @@ -207,6 +211,7 @@ message MVCCValueHeaderPure { (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"]; bool omit_in_rangefeeds = 3; + uint32 import_epoch = 4; } // MVCCValueHeaderCrdbTest is not to be used directly. It's generated only for use of // its marshaling methods by MVCCValueHeader. See the comment there. @@ -219,6 +224,7 @@ message MVCCValueHeaderCrdbTest { util.hlc.Timestamp local_timestamp = 1 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"]; bool omit_in_rangefeeds = 3; + uint32 import_epoch = 4; } // MVCCStatsDelta is convertible to MVCCStats, but uses signed variable width diff --git a/pkg/storage/enginepb/mvcc3_test.go b/pkg/storage/enginepb/mvcc3_test.go index 565fc3f37b3d..16413279006c 100644 --- a/pkg/storage/enginepb/mvcc3_test.go +++ b/pkg/storage/enginepb/mvcc3_test.go @@ -31,6 +31,7 @@ func populatedMVCCValueHeader() MVCCValueHeader { allFieldsSet := MVCCValueHeader{ LocalTimestamp: hlc.ClockTimestamp{WallTime: 1, Logical: 1}, OmitInRangefeeds: true, + ImportEpoch: 1, } allFieldsSet.KVNemesisSeq.Set(123) return allFieldsSet diff --git a/pkg/storage/enginepb/mvcc3_valueheader.go b/pkg/storage/enginepb/mvcc3_valueheader.go index 17e9faa07a82..81f50ff5d2ab 100644 --- a/pkg/storage/enginepb/mvcc3_valueheader.go +++ b/pkg/storage/enginepb/mvcc3_valueheader.go @@ -22,6 +22,7 @@ func (h *MVCCValueHeader) pure() MVCCValueHeaderPure { return MVCCValueHeaderPure{ LocalTimestamp: h.LocalTimestamp, OmitInRangefeeds: h.OmitInRangefeeds, + ImportEpoch: h.ImportEpoch, } } diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index f6a4e09d9467..ab63da1d7116 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -7884,9 +7884,14 @@ func mvccExportToWriter( return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding mvcc value %s", unsafeKey) } - // Export only the inner roachpb.Value, not the MVCCValue header. - unsafeValue = mvccValue.Value.RawBytes - + if opts.IncludeMVCCValueHeader { + unsafeValue, err = EncodeMVCCValueForExport(mvccValue) + if err != nil { + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "repackaging imported mvcc value %s", unsafeKey) + } + } else { + unsafeValue = mvccValue.Value.RawBytes + } // Skip tombstone records when start time is zero (non-incremental) // and we are not exporting all versions. skip = skipTombstones && mvccValue.IsTombstone() @@ -8052,6 +8057,14 @@ type MVCCExportOptions struct { // FingerprintOptions controls how fingerprints are generated // when using MVCCExportFingerprint. FingerprintOptions MVCCExportFingerprintOptions + + // IncludeMVCCValueHeader controls whether we include + // MVCCValueHeaders in the exported data. When true, the + // portions of the header appropriate for export are included + // in the encoded values. Callers should be ready to decode + // full MVCCValue's in this case. + IncludeMVCCValueHeader bool + // ScanStats, if set, is updated with iterator stats upon export success of // failure. Non-iterator stats i.e., {NumGets,NumReverseScans} are left // unchanged, and NumScans is incremented by 1. diff --git a/pkg/storage/mvcc_value.go b/pkg/storage/mvcc_value.go index 85c288999f35..2b8ea7f426f4 100644 --- a/pkg/storage/mvcc_value.go +++ b/pkg/storage/mvcc_value.go @@ -12,6 +12,8 @@ package storage import ( "encoding/binary" + "fmt" + "strings" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -118,15 +120,34 @@ func (v MVCCValue) String() string { // SafeFormat implements the redact.SafeFormatter interface. func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) { if v.MVCCValueHeader != (enginepb.MVCCValueHeader{}) { + fields := make([]string, 0) w.Printf("{") if !v.LocalTimestamp.IsEmpty() { - w.Printf("localTs=%s", v.LocalTimestamp) + fields = append(fields, fmt.Sprintf("localTs=%s", v.LocalTimestamp)) } + if v.ImportEpoch != 0 { + fields = append(fields, fmt.Sprintf("importEpoch=%v", v.ImportEpoch)) + } + w.Print(strings.Join(fields, ", ")) w.Printf("}") } w.Print(v.Value.PrettyPrint()) } +// EncodeMVCCValueForExport encodes fields from the MVCCValueHeader +// that are appropriate for export out of the cluster. +func EncodeMVCCValueForExport(mvccValue MVCCValue) ([]byte, error) { + if mvccValue.ImportEpoch == 0 { + return mvccValue.Value.RawBytes, nil + } + + // We only export ImportEpoch. + mvccValue.MVCCValueHeader = enginepb.MVCCValueHeader{ + ImportEpoch: mvccValue.ImportEpoch, + } + return EncodeMVCCValue(mvccValue) +} + // When running a metamorphic build, disable the simple MVCC value encoding to // prevent code from assuming that the MVCCValue encoding is identical to the // roachpb.Value encoding. @@ -211,6 +232,32 @@ func DecodeMVCCValue(buf []byte) (MVCCValue, error) { return decodeExtendedMVCCValue(buf) } +// DecodeValueFromMVCCValue decodes and MVCCValue and returns the +// roachpb.Value portion without parsing the MVCCValueHeader. +// +// NB: Caller assumes that this function does not copy or re-allocate +// the underlying byte slice. +func DecodeValueFromMVCCValue(buf []byte) (roachpb.Value, error) { + if len(buf) == 0 { + // Tombstone with no header. + return roachpb.Value{}, nil + } + if len(buf) <= tagPos { + return roachpb.Value{}, errMVCCValueMissingTag + } + if buf[tagPos] != extendedEncodingSentinel { + return roachpb.Value{RawBytes: buf}, nil + } + + // Extended encoding + headerLen := binary.BigEndian.Uint32(buf) + headerSize := extendedPreludeSize + headerLen + if len(buf) < int(headerSize) { + return roachpb.Value{}, errMVCCValueMissingHeader + } + return roachpb.Value{RawBytes: buf[headerSize:]}, nil +} + // DecodeMVCCValueAndErr is a helper that can be called using the ([]byte, // error) pair returned from the iterator UnsafeValue(), Value() methods. func DecodeMVCCValueAndErr(buf []byte, err error) (MVCCValue, error) { diff --git a/pkg/storage/mvcc_value_test.go b/pkg/storage/mvcc_value_test.go index 9c721574a8a8..b5bfd6e4151a 100644 --- a/pkg/storage/mvcc_value_test.go +++ b/pkg/storage/mvcc_value_test.go @@ -80,26 +80,72 @@ func TestMVCCValueGetLocalTimestamp(t *testing.T) { } } +func TestEncodeMVCCValueForExport(t *testing.T) { + defer leaktest.AfterTest(t)() + var strVal, intVal roachpb.Value + strVal.SetString("foo") + intVal.SetInt(17) + + var importEpoch uint32 = 3 + tsHeader := enginepb.MVCCValueHeader{LocalTimestamp: hlc.ClockTimestamp{WallTime: 9}} + + valHeaderFull := tsHeader + valHeaderFull.ImportEpoch = importEpoch + + jobIDHeader := enginepb.MVCCValueHeader{ImportEpoch: importEpoch} + + testcases := map[string]struct { + val MVCCValue + expect MVCCValue + }{ + "noHeader": {val: MVCCValue{Value: intVal}, expect: MVCCValue{Value: intVal}}, + "tsHeader": {val: MVCCValue{MVCCValueHeader: tsHeader, Value: intVal}, expect: MVCCValue{Value: intVal}}, + "jobIDOnly": {val: MVCCValue{MVCCValueHeader: jobIDHeader, Value: intVal}, expect: MVCCValue{MVCCValueHeader: jobIDHeader, Value: intVal}}, + "fullHeader": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}, expect: MVCCValue{MVCCValueHeader: jobIDHeader, Value: intVal}}, + } + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + encodedVal, err := EncodeMVCCValueForExport(tc.val) + require.NoError(t, err) + strippedMVCCVal, err := DecodeMVCCValue(encodedVal) + require.NoError(t, err) + require.Equal(t, tc.expect, strippedMVCCVal) + }) + } + +} func TestMVCCValueFormat(t *testing.T) { defer leaktest.AfterTest(t)() var strVal, intVal roachpb.Value strVal.SetString("foo") intVal.SetInt(17) + var importEpoch uint32 = 3 valHeader := enginepb.MVCCValueHeader{} valHeader.LocalTimestamp = hlc.ClockTimestamp{WallTime: 9} + valHeaderFull := valHeader + valHeaderFull.ImportEpoch = importEpoch + + valHeaderWithJobIDOnly := enginepb.MVCCValueHeader{ImportEpoch: importEpoch} + testcases := map[string]struct { val MVCCValue expect string }{ - "tombstone": {val: MVCCValue{}, expect: "/"}, - "bytes": {val: MVCCValue{Value: strVal}, expect: "/BYTES/foo"}, - "int": {val: MVCCValue{Value: intVal}, expect: "/INT/17"}, - "header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: "{localTs=0.000000009,0}/"}, - "header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: "{localTs=0.000000009,0}/BYTES/foo"}, - "header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: "{localTs=0.000000009,0}/INT/17"}, + "tombstone": {val: MVCCValue{}, expect: "/"}, + "bytes": {val: MVCCValue{Value: strVal}, expect: "/BYTES/foo"}, + "int": {val: MVCCValue{Value: intVal}, expect: "/INT/17"}, + "header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: "{localTs=0.000000009,0}/"}, + "header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: "{localTs=0.000000009,0}/BYTES/foo"}, + "header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: "{localTs=0.000000009,0}/INT/17"}, + "headerJobIDOnly+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly}, expect: "{importEpoch=3}/"}, + "headerJobIDOnly+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: strVal}, expect: "{importEpoch=3}/BYTES/foo"}, + "headerJobIDOnly+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: intVal}, expect: "{importEpoch=3}/INT/17"}, + "headerFull+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderFull}, expect: "{localTs=0.000000009,0, importEpoch=3}/"}, + "headerFull+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: strVal}, expect: "{localTs=0.000000009,0, importEpoch=3}/BYTES/foo"}, + "headerFull+int": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}, expect: "{localTs=0.000000009,0, importEpoch=3}/INT/17"}, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { @@ -115,19 +161,31 @@ func TestEncodeDecodeMVCCValue(t *testing.T) { var strVal, intVal roachpb.Value strVal.SetString("foo") intVal.SetInt(17) + var importEpoch uint32 = 3 valHeader := enginepb.MVCCValueHeader{} valHeader.LocalTimestamp = hlc.ClockTimestamp{WallTime: 9} + valHeaderFull := valHeader + valHeaderFull.ImportEpoch = importEpoch + + valHeaderWithJobIDOnly := enginepb.MVCCValueHeader{ImportEpoch: importEpoch} + testcases := map[string]struct { val MVCCValue }{ - "tombstone": {val: MVCCValue{}}, - "bytes": {val: MVCCValue{Value: strVal}}, - "int": {val: MVCCValue{Value: intVal}}, - "header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}}, - "header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}}, - "header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}}, + "tombstone": {val: MVCCValue{}}, + "bytes": {val: MVCCValue{Value: strVal}}, + "int": {val: MVCCValue{Value: intVal}}, + "header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}}, + "header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}}, + "header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}}, + "headerJobIDOnly+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly}}, + "headerJobIDOnly+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: strVal}}, + "headerJobIDOnly+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: intVal}}, + "headerFull+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderFull}}, + "headerFull+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: strVal}}, + "headerFull+int": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}}, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) for name, tc := range testcases { @@ -153,6 +211,24 @@ func TestEncodeDecodeMVCCValue(t *testing.T) { return buf.String() })) + t.Run("DeocdeValueFromMVCCValue/"+name, func(t *testing.T) { + enc, err := EncodeMVCCValue(tc.val) + require.NoError(t, err) + assert.Equal(t, encodedMVCCValueSize(tc.val), len(enc)) + + dec, err := DecodeValueFromMVCCValue(enc) + require.NoError(t, err) + + if len(dec.RawBytes) == 0 { + dec.RawBytes = nil // normalize + } + + require.Equal(t, tc.val.Value, dec) + require.Equal(t, tc.val.IsTombstone(), len(dec.RawBytes) == 0) + isTombstone, err := EncodedMVCCValueIsTombstone(enc) + require.NoError(t, err) + require.Equal(t, tc.val.IsTombstone(), isTombstone) + }) } } @@ -175,6 +251,14 @@ func TestDecodeMVCCValueErrors(t *testing.T) { require.Equal(t, tc.expect, err) require.False(t, isTombstone) }) + t.Run("DecodeValueFromMVCCValue/"+name, func(t *testing.T) { + dec, err := DecodeValueFromMVCCValue(tc.enc) + require.Equal(t, tc.expect, err) + require.Zero(t, dec) + isTombstone, err := EncodedMVCCValueIsTombstone(tc.enc) + require.Equal(t, tc.expect, err) + require.False(t, isTombstone) + }) } } @@ -183,15 +267,19 @@ func mvccValueBenchmarkConfigs() ( values map[string]roachpb.Value, ) { headers = map[string]enginepb.MVCCValueHeader{ - "empty": {}, - "local walltime": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545}}, - "local walltime+logical": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545, Logical: 4096}}, - "omit in rangefeeds": {OmitInRangefeeds: true}, + "empty": {}, + "jobID": {ImportEpoch: 3}, + "local walltime": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545}}, + "local walltime+logical": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545, Logical: 4096}}, + "omit in rangefeeds": {OmitInRangefeeds: true}, + "local walltime+jobID": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545}, ImportEpoch: 3}, + "local walltime+logical+jobID": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545, Logical: 4096}, ImportEpoch: 3}, } if testing.Short() { // Reduce the number of configurations in short mode. delete(headers, "local walltime") delete(headers, "omit in rangefeeds") + delete(headers, "local walltime+jobID") } values = map[string]roachpb.Value{ "tombstone": {}, @@ -221,6 +309,26 @@ func BenchmarkEncodeMVCCValue(b *testing.B) { } } +func BenchmarkEncodeMVCCValueForExport(b *testing.B) { + DisableMetamorphicSimpleValueEncoding(b) + headers, values := mvccValueBenchmarkConfigs() + for hDesc, h := range headers { + for vDesc, v := range values { + name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) + mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + res, err := EncodeMVCCValueForExport(mvccValue) + if err != nil { // for performance + require.NoError(b, err) + } + _ = res + } + }) + } + } +} + func BenchmarkDecodeMVCCValue(b *testing.B) { headers, values := mvccValueBenchmarkConfigs() for hDesc, h := range headers { @@ -254,6 +362,27 @@ func BenchmarkDecodeMVCCValue(b *testing.B) { } } +func BenchmarkDecodeValueFromMVCCValue(b *testing.B) { + headers, values := mvccValueBenchmarkConfigs() + for hDesc, h := range headers { + for vDesc, v := range values { + name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) + mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + buf, err := EncodeMVCCValue(mvccValue) + require.NoError(b, err) + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + res, err := DecodeValueFromMVCCValue(buf) + if err != nil { // for performance + require.NoError(b, err) + } + _ = res + } + }) + } + } +} + func BenchmarkMVCCValueIsTombstone(b *testing.B) { headers, values := mvccValueBenchmarkConfigs() for hDesc, h := range headers { diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes new file mode 100644 index 000000000000..639f29abdbb1 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000006650a02080920030000000003666f6f diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int new file mode 100644 index 000000000000..aeeffb2ef055 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000006650a0208092003000000000122 diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone new file mode 100644 index 000000000000..767d07819092 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000006650a0208092003 diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_bytes b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_bytes new file mode 100644 index 000000000000..82d66cbef4ad --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_bytes @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000004650a0020030000000003666f6f diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_int b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_int new file mode 100644 index 000000000000..5b3bb6afacbd --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_int @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000004650a002003000000000122 diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_tombstone b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_tombstone new file mode 100644 index 000000000000..2f360470fbc7 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_tombstone @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000004650a002003