From 6ed2789c28fc04b620aaf19391864e89ddb3f0d0 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 28 Jul 2022 15:26:08 -0400 Subject: [PATCH] kv/bulk: write ImportEpoch to each MVCCValue during IMPORT This patch makes IMPORT INTO on a non-empty table write the table's ImportEpoch to each ingested MVCC Value, via the SSTBatcher. In a future PR, the ImportEpoch will be used to track and rollback an IMPORT INTO. This additional information will allow IMPORTing tables to be backed up and restored, as described in this [RFC](https://docs.google.com/document/d/16TbkFznqbsu3mialSw6o1sxOEn1pKhhJ9HTxNdw0-WE/edit#heading=h.bpox0bmkz77i). Informs #76722 Release note: None Co-authored-by: Steven Danna --- pkg/ccl/backupccl/restore_data_processor.go | 1 + pkg/kv/bulk/buffering_adder.go | 20 +++- pkg/kv/bulk/sst_batcher.go | 18 ++- pkg/kv/bulk/sst_batcher_test.go | 76 ++++++++++++- pkg/kv/kvserver/kvserverbase/bulk_adder.go | 6 + pkg/sql/importer/BUILD.bazel | 3 + pkg/sql/importer/import_job.go | 8 ++ pkg/sql/importer/import_mvcc_test.go | 115 ++++++++++++++++++++ pkg/sql/importer/import_processor.go | 11 ++ pkg/sql/importer/import_processor_test.go | 6 +- 10 files changed, 255 insertions(+), 9 deletions(-) create mode 100644 pkg/sql/importer/import_mvcc_test.go diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index e2ecf7d3ea4e..903081ecc841 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -581,6 +581,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( if verbose { log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint()) } + // TODO(msbutler): ingest the ImportEpoch from an in progress import if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil { return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint()) } diff --git a/pkg/kv/bulk/buffering_adder.go b/pkg/kv/bulk/buffering_adder.go index 3de6446cdb60..ef585396b84a 100644 --- a/pkg/kv/bulk/buffering_adder.go +++ b/pkg/kv/bulk/buffering_adder.go @@ -63,6 +63,12 @@ type BufferingAdder struct { // name of the BufferingAdder for the purpose of logging only. name string + // importEpoch specifies the ImportEpoch of the table the BufferingAdder + // is ingesting data as part of an IMPORT INTO job. If specified, the Bulk + // Adder's SSTBatcher will write the import epoch to each versioned value's + // metadata. + importEpoch uint32 + bulkMon *mon.BytesMonitor memAcc mon.BoundAccount @@ -96,7 +102,8 @@ func MakeBulkAdder( } b := &BufferingAdder{ - name: opts.Name, + name: opts.Name, + importEpoch: opts.ImportEpoch, sink: SSTBatcher{ name: opts.Name, db: db, @@ -303,8 +310,15 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { for i := range b.curBuf.entries { mvccKey.Key = b.curBuf.Key(i) - if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil { - return err + if b.importEpoch != 0 { + if err := b.sink.AddMVCCKeyWithImportEpoch(ctx, mvccKey, b.curBuf.Value(i), + b.importEpoch); err != nil { + return err + } + } else { + if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil { + return err + } } } if err := b.sink.Flush(ctx); err != nil { diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 51dd1dc4bb23..e19ec6b05a03 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -329,6 +329,21 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) { b.mu.onFlush = onFlush } +func (b *SSTBatcher) AddMVCCKeyWithImportEpoch( + ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32, +) error { + mvccVal, err := storage.DecodeMVCCValue(value) + if err != nil { + return err + } + mvccVal.MVCCValueHeader.ImportEpoch = importEpoch + encVal, err := storage.EncodeMVCCValue(mvccVal) + if err != nil { + return err + } + return b.AddMVCCKey(ctx, key, encVal) +} + // AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed). // This is only for callers that want to control the timestamp on individual // keys -- like RESTORE where we want the restored data to look like the backup. @@ -389,8 +404,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value if !b.disallowShadowingBelow.IsEmpty() { b.updateMVCCStats(key, value) } - - return b.sstWriter.Put(key, value) + return b.sstWriter.PutRawMVCC(key, value) } // Reset clears all state in the batcher and prepares it for reuse. diff --git a/pkg/kv/bulk/sst_batcher_test.go b/pkg/kv/bulk/sst_batcher_test.go index 4912a4bbe3e3..2a74a7daa749 100644 --- a/pkg/kv/bulk/sst_batcher_test.go +++ b/pkg/kv/bulk/sst_batcher_test.go @@ -308,7 +308,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) { mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil) reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000) b, err := bulk.MakeBulkAdder( - ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs, + ctx, kvDB, mockCache, s.ClusterSettings(), ts, + kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs, ) require.NoError(t, err) @@ -361,3 +362,76 @@ func runTestImport(t *testing.T, batchSizeValue int64) { }) } } + +var DummyImportEpoch uint32 = 3 + +func TestImportEpochIngestion(t *testing.T) { + defer leaktest.AfterTest(t)() + + defer log.Scope(t).Close(t) + ctx := context.Background() + + mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil) + reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000) + s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + b, err := bulk.MakeTestingSSTBatcher(ctx, kvDB, s.ClusterSettings(), + false, true, mem.MakeConcurrentBoundAccount(), reqs) + require.NoError(t, err) + defer b.Close(ctx) + + startKey := storageutils.PointKey("a", 1) + endKey := storageutils.PointKey("b", 1) + value := storageutils.StringValueRaw("myHumbleValue") + mvccValue, err := storage.DecodeMVCCValue(value) + require.NoError(t, err) + + require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, startKey, value, DummyImportEpoch)) + require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, endKey, value, DummyImportEpoch)) + require.NoError(t, b.Flush(ctx)) + + // Check that ingested key contains the dummy job ID + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: startKey.Key, + EndKey: endKey.Key, + }, + MVCCFilter: kvpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + } + + header := kvpb.Header{Timestamp: s.Clock().Now()} + resp, roachErr := kv.SendWrappedWith(ctx, + kvDB.NonTransactionalSender(), header, req) + require.NoError(t, roachErr.GoError()) + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: startKey.Key, + UpperBound: endKey.Key, + } + + checkedJobId := false + for _, file := range resp.(*kvpb.ExportResponse).Files { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts) + require.NoError(t, err) + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + break + } + rawVal, err := it.UnsafeValue() + require.NoError(t, err) + val, err := storage.DecodeMVCCValue(rawVal) + require.NoError(t, err) + require.Equal(t, startKey, it.UnsafeKey()) + require.Equal(t, mvccValue.Value, val.Value) + require.Equal(t, DummyImportEpoch, val.ImportEpoch) + require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp) + checkedJobId = true + } + } + require.Equal(t, true, checkedJobId) +} diff --git a/pkg/kv/kvserver/kvserverbase/bulk_adder.go b/pkg/kv/kvserver/kvserverbase/bulk_adder.go index 62323091a093..0841978c88f5 100644 --- a/pkg/kv/kvserver/kvserverbase/bulk_adder.go +++ b/pkg/kv/kvserver/kvserverbase/bulk_adder.go @@ -69,6 +69,12 @@ type BulkAdderOptions struct { // the first buffer to pick split points in the hope it is a representative // sample of the overall input. InitialSplitsIfUnordered int + + // ImportEpoch specifies the ImportEpoch of the table the BulkAdder + // is ingesting data into as part of an IMPORT INTO job. If specified, the Bulk + // Adder's SSTBatcher will write the import epoch to each versioned value's + // metadata. + ImportEpoch uint32 } // BulkAdderFactory describes a factory function for BulkAdders. diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 323e5d8acb2e..6a8008f0e59a 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -148,6 +148,7 @@ go_test( "exportparquet_test.go", "import_csv_mark_redaction_test.go", "import_into_test.go", + "import_mvcc_test.go", "import_processor_test.go", "import_stmt_test.go", "main_test.go", @@ -187,6 +188,7 @@ go_test( "//pkg/keys", "//pkg/keyvisualizer", "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvpb", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", @@ -229,6 +231,7 @@ go_test( "//pkg/sql/stats", "//pkg/sql/tests", "//pkg/sql/types", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/jobutils", diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index 30fedbe7936a..53651b22ef4e 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -292,6 +292,14 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { } } + if len(details.Tables) > 1 { + for _, tab := range details.Tables { + if !tab.IsNew { + return errors.AssertionFailedf("all tables in multi-table import must be new") + } + } + } + procsPerNode := int(processorsPerNode.Get(&p.ExecCfg().Settings.SV)) res, err := ingestWithRetry(ctx, p, r.job, tables, typeDescs, files, format, details.Walltime, diff --git a/pkg/sql/importer/import_mvcc_test.go b/pkg/sql/importer/import_mvcc_test.go new file mode 100644 index 000000000000..9b979471fbcc --- /dev/null +++ b/pkg/sql/importer/import_mvcc_test.go @@ -0,0 +1,115 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package importer_test + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestMVCCValueHeaderImportEpoch tests that the import job ID is properly +// stored in the MVCCValueHeader in an imported key's MVCCValue. +func TestMVCCValueHeaderImportEpoch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + server, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s := server.ApplicationLayer() + defer server.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(db) + + sqlDB.Exec(t, `CREATE DATABASE d`) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + fmt.Fprint(w, "1") + } + })) + defer srv.Close() + + // Create a table where the first row ( in sort order) comes from an IMPORT + // while the second comes from an INSERT. + sqlDB.Exec(t, `CREATE TABLE d.t (a INT8)`) + sqlDB.Exec(t, `INSERT INTO d.t VALUES ('2')`) + sqlDB.Exec(t, `IMPORT INTO d.t CSV DATA ($1)`, srv.URL) + + // Conduct an export request to iterate over the keys in the table. + var tableID uint32 + sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`, + "t").Scan(&tableID) + + startKey := s.Codec().TablePrefix(tableID) + endKey := startKey.PrefixEnd() + + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: startKey, + EndKey: endKey, + }, + MVCCFilter: kvpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + } + + header := kvpb.Header{Timestamp: s.Clock().Now()} + resp, roachErr := kv.SendWrappedWith(ctx, + s.DistSenderI().(*kvcoord.DistSender), header, req) + require.NoError(t, roachErr.GoError()) + + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: startKey, + UpperBound: endKey, + } + + // Ensure there are 2 keys in the span, and only the first one contains job ID metadata + keyCount := 0 + for _, file := range resp.(*kvpb.ExportResponse).Files { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts) + require.NoError(t, err) + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + break + } + rawVal, err := it.UnsafeValue() + require.NoError(t, err) + val, err := storage.DecodeMVCCValue(rawVal) + require.NoError(t, err) + if keyCount == 0 { + require.NotEqual(t, uint32(0), val.ImportEpoch) + } else if keyCount == 1 { + require.Equal(t, uint32(0), val.ImportEpoch) + } else { + t.Fatal("more than 2 keys in the table") + } + require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp) + keyCount++ + } + } +} diff --git a/pkg/sql/importer/import_processor.go b/pkg/sql/importer/import_processor.go index ef79797eac16..681618217d64 100644 --- a/pkg/sql/importer/import_processor.go +++ b/pkg/sql/importer/import_processor.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -389,6 +390,14 @@ func ingestKvs( // will hog memory as it tries to grow more aggressively. minBufferSize, maxBufferSize := importBufferConfigSizes(flowCtx.Cfg.Settings, true /* isPKAdder */) + + var bulkAdderImportEpoch uint32 + if flowCtx.Cfg.Settings.Version.IsActive(ctx, clusterversion.V24_1) && len(spec.Tables) == 1 { + for _, v := range spec.Tables { + bulkAdderImportEpoch = v.Desc.ImportEpoch + } + + } pkIndexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB.KV(), writeTS, kvserverbase.BulkAdderOptions{ Name: pkAdderName, DisallowShadowingBelow: writeTS, @@ -397,6 +406,7 @@ func ingestKvs( MaxBufferSize: maxBufferSize, InitialSplitsIfUnordered: int(spec.InitialSplits), WriteAtBatchTimestamp: true, + ImportEpoch: bulkAdderImportEpoch, }) if err != nil { return nil, err @@ -413,6 +423,7 @@ func ingestKvs( MaxBufferSize: maxBufferSize, InitialSplitsIfUnordered: int(spec.InitialSplits), WriteAtBatchTimestamp: true, + ImportEpoch: bulkAdderImportEpoch, }) if err != nil { return nil, err diff --git a/pkg/sql/importer/import_processor_test.go b/pkg/sql/importer/import_processor_test.go index 3a959327ae6f..a1ecc42be252 100644 --- a/pkg/sql/importer/import_processor_test.go +++ b/pkg/sql/importer/import_processor_test.go @@ -243,7 +243,7 @@ func TestImportIgnoresProcessedFiles(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( @@ -368,7 +368,7 @@ func TestImportHonorsResumePosition(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( @@ -502,7 +502,7 @@ func TestImportHandlesDuplicateKVs(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func(