Skip to content

Commit

Permalink
kv/bulk: write ImportEpoch to each MVCCValue during IMPORT
Browse files Browse the repository at this point in the history
This patch makes IMPORT INTO on a non-empty table write the table's
ImportEpoch to each ingested MVCC Value, via the SSTBatcher. In a
future PR, the ImportEpoch will be used to track and rollback an
IMPORT INTO. This additional information will allow IMPORTing tables
to be backed up and restored, as described in this
[RFC](https://docs.google.com/document/d/16TbkFznqbsu3mialSw6o1sxOEn1pKhhJ9HTxNdw0-WE/edit#heading=h.bpox0bmkz77i).

Informs #76722

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
msbutler and stevendanna committed Mar 11, 2024
1 parent 9b6f65f commit 6ed2789
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
if verbose {
log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint())
}
// TODO(msbutler): ingest the ImportEpoch from an in progress import
if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil {
return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint())
}
Expand Down
20 changes: 17 additions & 3 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type BufferingAdder struct {
// name of the BufferingAdder for the purpose of logging only.
name string

// importEpoch specifies the ImportEpoch of the table the BufferingAdder
// is ingesting data as part of an IMPORT INTO job. If specified, the Bulk
// Adder's SSTBatcher will write the import epoch to each versioned value's
// metadata.
importEpoch uint32

bulkMon *mon.BytesMonitor
memAcc mon.BoundAccount

Expand Down Expand Up @@ -96,7 +102,8 @@ func MakeBulkAdder(
}

b := &BufferingAdder{
name: opts.Name,
name: opts.Name,
importEpoch: opts.ImportEpoch,
sink: SSTBatcher{
name: opts.Name,
db: db,
Expand Down Expand Up @@ -303,8 +310,15 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {

for i := range b.curBuf.entries {
mvccKey.Key = b.curBuf.Key(i)
if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil {
return err
if b.importEpoch != 0 {
if err := b.sink.AddMVCCKeyWithImportEpoch(ctx, mvccKey, b.curBuf.Value(i),
b.importEpoch); err != nil {
return err
}
} else {
if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil {
return err
}
}
}
if err := b.sink.Flush(ctx); err != nil {
Expand Down
18 changes: 16 additions & 2 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,21 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) {
b.mu.onFlush = onFlush
}

func (b *SSTBatcher) AddMVCCKeyWithImportEpoch(
ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32,
) error {
mvccVal, err := storage.DecodeMVCCValue(value)
if err != nil {
return err
}
mvccVal.MVCCValueHeader.ImportEpoch = importEpoch
encVal, err := storage.EncodeMVCCValue(mvccVal)
if err != nil {
return err
}
return b.AddMVCCKey(ctx, key, encVal)
}

// AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed).
// This is only for callers that want to control the timestamp on individual
// keys -- like RESTORE where we want the restored data to look like the backup.
Expand Down Expand Up @@ -389,8 +404,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value
if !b.disallowShadowingBelow.IsEmpty() {
b.updateMVCCStats(key, value)
}

return b.sstWriter.Put(key, value)
return b.sstWriter.PutRawMVCC(key, value)
}

// Reset clears all state in the batcher and prepares it for reuse.
Expand Down
76 changes: 75 additions & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
b, err := bulk.MakeBulkAdder(
ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs,
ctx, kvDB, mockCache, s.ClusterSettings(), ts,
kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs,
)
require.NoError(t, err)

Expand Down Expand Up @@ -361,3 +362,76 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
})
}
}

var DummyImportEpoch uint32 = 3

func TestImportEpochIngestion(t *testing.T) {
defer leaktest.AfterTest(t)()

defer log.Scope(t).Close(t)
ctx := context.Background()

mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

b, err := bulk.MakeTestingSSTBatcher(ctx, kvDB, s.ClusterSettings(),
false, true, mem.MakeConcurrentBoundAccount(), reqs)
require.NoError(t, err)
defer b.Close(ctx)

startKey := storageutils.PointKey("a", 1)
endKey := storageutils.PointKey("b", 1)
value := storageutils.StringValueRaw("myHumbleValue")
mvccValue, err := storage.DecodeMVCCValue(value)
require.NoError(t, err)

require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, startKey, value, DummyImportEpoch))
require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, endKey, value, DummyImportEpoch))
require.NoError(t, b.Flush(ctx))

// Check that ingested key contains the dummy job ID
req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeader{
Key: startKey.Key,
EndKey: endKey.Key,
},
MVCCFilter: kvpb.MVCCFilter_All,
StartTime: hlc.Timestamp{},
}

header := kvpb.Header{Timestamp: s.Clock().Now()}
resp, roachErr := kv.SendWrappedWith(ctx,
kvDB.NonTransactionalSender(), header, req)
require.NoError(t, roachErr.GoError())
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: startKey.Key,
UpperBound: endKey.Key,
}

checkedJobId := false
for _, file := range resp.(*kvpb.ExportResponse).Files {
it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts)
require.NoError(t, err)
defer it.Close()
for it.SeekGE(storage.NilKey); ; it.Next() {
ok, err := it.Valid()
require.NoError(t, err)
if !ok {
break
}
rawVal, err := it.UnsafeValue()
require.NoError(t, err)
val, err := storage.DecodeMVCCValue(rawVal)
require.NoError(t, err)
require.Equal(t, startKey, it.UnsafeKey())
require.Equal(t, mvccValue.Value, val.Value)
require.Equal(t, DummyImportEpoch, val.ImportEpoch)
require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp)
checkedJobId = true
}
}
require.Equal(t, true, checkedJobId)
}
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/kvserverbase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ type BulkAdderOptions struct {
// the first buffer to pick split points in the hope it is a representative
// sample of the overall input.
InitialSplitsIfUnordered int

// ImportEpoch specifies the ImportEpoch of the table the BulkAdder
// is ingesting data into as part of an IMPORT INTO job. If specified, the Bulk
// Adder's SSTBatcher will write the import epoch to each versioned value's
// metadata.
ImportEpoch uint32
}

// BulkAdderFactory describes a factory function for BulkAdders.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ go_test(
"exportparquet_test.go",
"import_csv_mark_redaction_test.go",
"import_into_test.go",
"import_mvcc_test.go",
"import_processor_test.go",
"import_stmt_test.go",
"main_test.go",
Expand Down Expand Up @@ -187,6 +188,7 @@ go_test(
"//pkg/keys",
"//pkg/keyvisualizer",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
Expand Down Expand Up @@ -229,6 +231,7 @@ go_test(
"//pkg/sql/stats",
"//pkg/sql/tests",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/jobutils",
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,14 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}

if len(details.Tables) > 1 {
for _, tab := range details.Tables {
if !tab.IsNew {
return errors.AssertionFailedf("all tables in multi-table import must be new")
}
}
}

procsPerNode := int(processorsPerNode.Get(&p.ExecCfg().Settings.SV))

res, err := ingestWithRetry(ctx, p, r.job, tables, typeDescs, files, format, details.Walltime,
Expand Down
115 changes: 115 additions & 0 deletions pkg/sql/importer/import_mvcc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package importer_test

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestMVCCValueHeaderImportEpoch tests that the import job ID is properly
// stored in the MVCCValueHeader in an imported key's MVCCValue.
func TestMVCCValueHeaderImportEpoch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
server, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
s := server.ApplicationLayer()
defer server.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `CREATE DATABASE d`)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
fmt.Fprint(w, "1")
}
}))
defer srv.Close()

// Create a table where the first row ( in sort order) comes from an IMPORT
// while the second comes from an INSERT.
sqlDB.Exec(t, `CREATE TABLE d.t (a INT8)`)
sqlDB.Exec(t, `INSERT INTO d.t VALUES ('2')`)
sqlDB.Exec(t, `IMPORT INTO d.t CSV DATA ($1)`, srv.URL)

// Conduct an export request to iterate over the keys in the table.
var tableID uint32
sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`,
"t").Scan(&tableID)

startKey := s.Codec().TablePrefix(tableID)
endKey := startKey.PrefixEnd()

req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeader{
Key: startKey,
EndKey: endKey,
},
MVCCFilter: kvpb.MVCCFilter_All,
StartTime: hlc.Timestamp{},
}

header := kvpb.Header{Timestamp: s.Clock().Now()}
resp, roachErr := kv.SendWrappedWith(ctx,
s.DistSenderI().(*kvcoord.DistSender), header, req)
require.NoError(t, roachErr.GoError())

iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: startKey,
UpperBound: endKey,
}

// Ensure there are 2 keys in the span, and only the first one contains job ID metadata
keyCount := 0
for _, file := range resp.(*kvpb.ExportResponse).Files {
it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts)
require.NoError(t, err)
defer it.Close()
for it.SeekGE(storage.NilKey); ; it.Next() {
ok, err := it.Valid()
require.NoError(t, err)
if !ok {
break
}
rawVal, err := it.UnsafeValue()
require.NoError(t, err)
val, err := storage.DecodeMVCCValue(rawVal)
require.NoError(t, err)
if keyCount == 0 {
require.NotEqual(t, uint32(0), val.ImportEpoch)
} else if keyCount == 1 {
require.Equal(t, uint32(0), val.ImportEpoch)
} else {
t.Fatal("more than 2 keys in the table")
}
require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp)
keyCount++
}
}
}
11 changes: 11 additions & 0 deletions pkg/sql/importer/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
Expand Down Expand Up @@ -389,6 +390,14 @@ func ingestKvs(
// will hog memory as it tries to grow more aggressively.
minBufferSize, maxBufferSize := importBufferConfigSizes(flowCtx.Cfg.Settings,
true /* isPKAdder */)

var bulkAdderImportEpoch uint32
if flowCtx.Cfg.Settings.Version.IsActive(ctx, clusterversion.V24_1) && len(spec.Tables) == 1 {
for _, v := range spec.Tables {
bulkAdderImportEpoch = v.Desc.ImportEpoch
}

}
pkIndexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB.KV(), writeTS, kvserverbase.BulkAdderOptions{
Name: pkAdderName,
DisallowShadowingBelow: writeTS,
Expand All @@ -397,6 +406,7 @@ func ingestKvs(
MaxBufferSize: maxBufferSize,
InitialSplitsIfUnordered: int(spec.InitialSplits),
WriteAtBatchTimestamp: true,
ImportEpoch: bulkAdderImportEpoch,
})
if err != nil {
return nil, err
Expand All @@ -413,6 +423,7 @@ func ingestKvs(
MaxBufferSize: maxBufferSize,
InitialSplitsIfUnordered: int(spec.InitialSplits),
WriteAtBatchTimestamp: true,
ImportEpoch: bulkAdderImportEpoch,
})
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 6ed2789

Please sign in to comment.