Skip to content

Commit

Permalink
bulk/kv write jobID to each MVCCValue's MVCCValueHeader during IMPORT
Browse files Browse the repository at this point in the history
This patch makes IMPORT write the import job ID to each ingested MVCC Value,
via the SSTBatcher. In a future PR, the import jobID will be used to track and
rollback an IMPORT. This additional information will allow IMPORTing tables to
be backed up and restored, as described in this [RFC](https://docs.google.com/document/d/16TbkFznqbsu3mialSw6o1sxOEn1pKhhJ9HTxNdw0-WE/edit#heading=h.bpox0bmkz77i).

Informs #76722

Release note: None
  • Loading branch information
msbutler committed Jul 29, 2022
1 parent 107909d commit 38ee242
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 6 deletions.
9 changes: 9 additions & 0 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,15 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
if log.V(5) {
log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint())
}
// TODO(msbutler): figure out what import job ID to encode in MVCC Keys that
// are part of an in progress IMPORT. The central question: do we assign a
// new import job ID to the in progress IMPORT? If so, could we do that
// before ingestion, and write the new job ID here?
//
// Further, I don't think keys from a completed import need to preserve
// their import job ID. Ideally, this could be resolved on the backup side:
// i.e. an Export request will only preserve the jobId on keys a part of an
// in progress IMPORT.
if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil {
return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint())
}
Expand Down
18 changes: 15 additions & 3 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type BufferingAdder struct {
// name of the BufferingAdder for the purpose of logging only.
name string

// importJobID specifies the import job the BufferingAdder is doing work for.
// If specified, the Bulk Adder's SSTBatcher will write the job ID to each
// versioned value's metadata.
importJobID int64

bulkMon *mon.BytesMonitor
memAcc mon.BoundAccount

Expand Down Expand Up @@ -89,7 +94,8 @@ func MakeBulkAdder(
}

b := &BufferingAdder{
name: opts.Name,
name: opts.Name,
importJobID: opts.ImportJobID,
sink: SSTBatcher{
name: opts.Name,
db: db,
Expand Down Expand Up @@ -265,8 +271,14 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {

for i := range b.curBuf.entries {
mvccKey.Key = b.curBuf.Key(i)
if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil {
return err
if b.importJobID != 0 {
if err := b.sink.AddMVCCKeyWithImportID(ctx, mvccKey, b.curBuf.Value(i), b.importJobID); err != nil {
return err
}
} else {
if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil {
return err
}
}
}
if err := b.sink.Flush(ctx); err != nil {
Expand Down
18 changes: 16 additions & 2 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,21 @@ func (b *SSTBatcher) updateMVCCStats(key storage.MVCCKey, value []byte) {
b.ms.ValCount++
}

func (b *SSTBatcher) AddMVCCKeyWithImportID(
ctx context.Context, key storage.MVCCKey, value []byte, importJobID int64,
) error {
mvccVal, err := storage.DecodeMVCCValue(value)
if err != nil {
return err
}
mvccVal.MVCCValueHeader.ClientMeta.ImportJobId = importJobID
encVal, err := storage.EncodeMVCCValue(mvccVal)
if err != nil {
return err
}
return b.AddMVCCKey(ctx, key, encVal)
}

// AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed).
// This is only for callers that want to control the timestamp on individual
// keys -- like RESTORE where we want the restored data to look like the backup.
Expand Down Expand Up @@ -310,8 +325,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value
if !b.disallowShadowingBelow.IsEmpty() {
b.updateMVCCStats(key, value)
}

return b.sstWriter.Put(key, value)
return b.sstWriter.PutRawMVCC(key, value)
}

// Reset clears all state in the batcher and prepares it for reuse.
Expand Down
76 changes: 75 additions & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestAddBatched(t *testing.T) {
})
}


func TestDuplicateHandling(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -298,7 +299,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
b, err := bulk.MakeBulkAdder(
ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs,
ctx, kvDB, mockCache, s.ClusterSettings(), ts,
kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize, ImportJobID: DummyJobID}, mem, reqs,
)
require.NoError(t, err)

Expand Down Expand Up @@ -351,3 +353,75 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
})
}
}

var DummyJobID int64 = 9999

func TestJobIDIngestion(t *testing.T) {
defer leaktest.AfterTest(t)()

defer log.Scope(t).Close(t)
ctx := context.Background()

mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

b, err := bulk.MakeTestingSSTBatcher(ctx, kvDB, s.ClusterSettings(),
false, true, mem.MakeBoundAccount(), reqs)
require.NoError(t, err)
defer b.Close(ctx)

startKey := storageutils.PointKey("a", 1)
endKey := storageutils.PointKey("b", 1)
value := storageutils.StringValueRaw("myHumbleValue")
mvccValue, err := storage.DecodeMVCCValue(value)
require.NoError(t, err)

require.NoError(t, b.AddMVCCKeyWithImportID(ctx, startKey, value, DummyJobID))
require.NoError(t, b.AddMVCCKeyWithImportID(ctx, endKey, value, DummyJobID))
require.NoError(t, b.Flush(ctx))

// Check that ingested key contains the dummy job ID
req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeader{
Key: startKey.Key,
EndKey: endKey.Key,
},
MVCCFilter: roachpb.MVCCFilter_All,
StartTime: hlc.Timestamp{},
ReturnSST: true,
}

header := roachpb.Header{Timestamp: s.Clock().Now()}
resp, roachErr := kv.SendWrappedWith(ctx,
kvDB.NonTransactionalSender(), header, req)
require.NoError(t, roachErr.GoError())
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: startKey.Key,
UpperBound: endKey.Key,
}

checkedJobId := false
for _, file := range resp.(*roachpb.ExportResponse).Files {
it, err := storage.NewPebbleMemSSTIterator(file.SST, false /* verify */, iterOpts)
require.NoError(t, err)
defer it.Close()
for it.SeekGE(storage.NilKey); ; it.Next() {
ok, err := it.Valid()
require.NoError(t, err)
if !ok {
break
}
val, err := storage.DecodeMVCCValue(it.UnsafeValue())
require.NoError(t, err)
require.Equal(t, startKey, it.UnsafeKey())
require.Equal(t, mvccValue.Value, val.Value)
require.Equal(t, DummyJobID, val.ClientMeta.ImportJobId)
require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp)
checkedJobId = true
}
}
require.Equal(t, true, checkedJobId)
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/kvserverbase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type BulkAdderOptions struct {
// the first buffer to pick split points in the hope it is a representative
// sample of the overall input.
InitialSplitsIfUnordered int

// ImportJobID specifies the import job the Bulk Adder is doing work for. If
// specified, the Bulk Adder's SSTBatcher will write the job ID to each
// versioned value's metadata.
ImportJobID int64
}

// BulkAdderFactory describes a factory function for BulkAdders.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ go_test(
"exportparquet_test.go",
"import_csv_mark_redaction_test.go",
"import_into_test.go",
"import_mvcc_test.go",
"import_processor_test.go",
"import_stmt_test.go",
"importer_upgrade_test.go",
Expand Down Expand Up @@ -170,6 +171,7 @@ go_test(
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
Expand Down Expand Up @@ -204,6 +206,7 @@ go_test(
"//pkg/sql/stats",
"//pkg/sql/tests",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
Expand Down
117 changes: 117 additions & 0 deletions pkg/sql/importer/import_mvcc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package importer_test

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestMVCCValueHeaderImportJobId tests that the import job ID is properly
// stored in the MVCCValueHeader in an imported key's MVCCValue.
func TestMVCCValueHeaderImportJobID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
ctx := context.Background()
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `CREATE DATABASE d`)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
fmt.Fprint(w, "1")
}
}))
defer srv.Close()

// Create a table where the first row ( in sort order) comes from an IMPORT
// while the second comes from an INSERT.
sqlDB.Exec(t, `CREATE TABLE d.t (a INT8)`)
sqlDB.Exec(t, `INSERT INTO d.t VALUES ('2')`)
sqlDB.Exec(t, `IMPORT INTO d.t CSV DATA ($1)`, srv.URL)

// Conduct an export request to iterator over the keys in the table.
var tableID uint32
sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`,
"t").Scan(&tableID)

startKey := keys.SystemSQLCodec.TablePrefix(tableID)
endKey := startKey.PrefixEnd()

req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeader{
Key: startKey,
EndKey: endKey,
},
MVCCFilter: roachpb.MVCCFilter_All,
StartTime: hlc.Timestamp{},
ReturnSST: true,
}

header := roachpb.Header{Timestamp: s.Clock().Now()}
resp, roachErr := kv.SendWrappedWith(ctx,
s.DistSenderI().(*kvcoord.DistSender), header, req)
require.NoError(t, roachErr.GoError())

iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: startKey,
UpperBound: endKey,
}

// Ensure there are 2 keys in the span, and only the first one contains job ID metadata
keyCount := 0
for _, file := range resp.(*roachpb.ExportResponse).Files {
it, err := storage.NewPebbleMemSSTIterator(file.SST, false /* verify */, iterOpts)
require.NoError(t, err)
defer it.Close()
for it.SeekGE(storage.NilKey); ; it.Next() {
ok, err := it.Valid()
require.NoError(t, err)
if !ok {
break
}
val, err := storage.DecodeMVCCValue(it.UnsafeValue())
require.NoError(t, err)
if keyCount == 0 {
require.NotEqual(t, int64(0), val.ClientMeta.ImportJobId)
} else if keyCount == 1 {
require.Equal(t, int64(0), val.ClientMeta.ImportJobId)
} else {
t.Fatal("more than 2 keys in the table")
}
require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp)
keyCount++
}
}
}
2 changes: 2 additions & 0 deletions pkg/sql/importer/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func ingestKvs(
MaxBufferSize: maxBufferSize,
InitialSplitsIfUnordered: int(spec.InitialSplits),
WriteAtBatchTimestamp: writeAtBatchTimestamp,
ImportJobID: spec.JobID,
})
if err != nil {
return nil, err
Expand All @@ -404,6 +405,7 @@ func ingestKvs(
MaxBufferSize: maxBufferSize,
InitialSplitsIfUnordered: int(spec.InitialSplits),
WriteAtBatchTimestamp: writeAtBatchTimestamp,
ImportJobID: spec.JobID,
})
if err != nil {
return nil, err
Expand Down

0 comments on commit 38ee242

Please sign in to comment.