From 6ea1c5eefb67080c6637baac4cda1f5dab1a9787 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Sun, 26 Dec 2021 11:56:29 -0500 Subject: [PATCH] ptstorage: change Protect and GetRecord to work with `target` column Protected timestamp records now apply on a `ptpb.Target` instead of `roachpb.Spans`. This change modifies the `Protect` and `GetRecord` methods of the ptstorage interface. Protect will persist the `ptpb.Target` specified on the passed in `ptpb.Record` as an encoded protocol buffer in the `target` column, and `GetRecord` will read the `target` column when constructing a `ptpb.Record` to return to the caller. If we are running in a mixed version state where the migration that adds the `target` column to the system.pts_records table has not run, we use the existing logic to protect Spans. This logic has wholesale been moved to `deprecatedProtect` method. Once the migration has run, all calls to protect must have a non-nil `ptpb.Target` field on the record, as we will no longer be persisting the `DeprecatedSpans` field in the underlying system table. Post migration, we do not use the `kv.protectedts.max_spans` to enforce any limits, but continue to use `kv.protectedts.max_bytes` to enforce a ceiling on the total size of encoded targets we can store in the subsystem. Most tests in `ptstorage_test` continue to run in both a pre and post migration state to ensure that `Storage` is handling both span backed and target backed records correctly. This can be cleaned up in a future version when we get rid of spans from the pts record entirely. All other PTS tests in the `kvserver` package run on a version prior to the pts migration since the remainder of the new pts system is yet to be implemented. These tests will be revisited as and when we develop those pieces of the subsystem. Informs: #73727 Release note: None --- pkg/base/testing_knobs.go | 1 + pkg/ccl/backupccl/backup_test.go | 2 + pkg/kv/kvserver/client_protectedts_test.go | 3 +- pkg/kv/kvserver/protectedts/BUILD.bazel | 2 + .../kvserver/protectedts/ptcache/BUILD.bazel | 1 - .../protectedts/ptcache/cache_test.go | 18 +- .../protectedts/ptprovider/provider.go | 3 +- .../protectedts/ptstorage/BUILD.bazel | 4 + pkg/kv/kvserver/protectedts/ptstorage/sql.go | 54 ++++- .../kvserver/protectedts/ptstorage/storage.go | 202 ++++++++++++++-- .../protectedts/ptstorage/storage_test.go | 223 +++++++++++++----- .../protectedts/ptstorage/validate_test.go | 66 ++++-- .../protectedts/ptverifier/verifier.go | 7 + .../protectedts/ptverifier/verifier_test.go | 2 +- pkg/kv/kvserver/protectedts/testing_knobs.go | 29 +++ pkg/server/server.go | 3 + pkg/server/tenant.go | 2 + 17 files changed, 502 insertions(+), 120 deletions(-) create mode 100644 pkg/kv/kvserver/protectedts/testing_knobs.go diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index c63bf805a64c..2ad14fd11388 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -45,4 +45,5 @@ type TestingKnobs struct { SQLLivenessKnobs ModuleTestingKnobs TelemetryLoggingKnobs ModuleTestingKnobs DialerKnobs ModuleTestingKnobs + ProtectedTS ModuleTestingKnobs } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 79ef1abbec33..17ab58ef81c8 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -6747,6 +6747,8 @@ func TestRestoreErrorPropagates(t *testing.T) { // TestProtectedTimestampsFailDueToLimits ensures that when creating a protected // timestamp record fails, we return the correct error. +// +// TODO(adityamaru): Remove in 22.2 once no records protect spans. func TestProtectedTimestampsFailDueToLimits(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/client_protectedts_test.go b/pkg/kv/kvserver/client_protectedts_test.go index 7b9fd21bf99b..fa1a0867078d 100644 --- a/pkg/kv/kvserver/client_protectedts_test.go +++ b/pkg/kv/kvserver/client_protectedts_test.go @@ -137,7 +137,8 @@ func TestProtectedTimestamps(t *testing.T) { beforeWrites := s0.Clock().Now() gcSoon() - pts := ptstorage.New(s0.ClusterSettings(), s0.InternalExecutor().(*sql.InternalExecutor)) + pts := ptstorage.New(s0.ClusterSettings(), s0.InternalExecutor().(*sql.InternalExecutor), + nil /* knobs */) ptsWithDB := ptstorage.WithDatabase(pts, s0.DB()) startKey := getTableStartKey("foo") ptsRec := ptpb.Record{ diff --git a/pkg/kv/kvserver/protectedts/BUILD.bazel b/pkg/kv/kvserver/protectedts/BUILD.bazel index 333aa226ac05..86b208f9f7f5 100644 --- a/pkg/kv/kvserver/protectedts/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/BUILD.bazel @@ -5,10 +5,12 @@ go_library( srcs = [ "protectedts.go", "settings.go", + "testing_knobs.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/kv", "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb:with-mocks", diff --git a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel index 37ee963c94c8..dfc9e33c5639 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel @@ -34,7 +34,6 @@ go_test( "//pkg/base", "//pkg/keys", "//pkg/kv/kvserver", - "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/kv/kvserver/protectedts/ptstorage", diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go index 3e173e3e4826..cd251dd372b6 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptcache" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" @@ -48,8 +47,8 @@ func TestCacheBasic(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) - p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) + p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), + nil /* knobs */), s.DB()) // Set the poll interval to be very short. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Microsecond) @@ -110,7 +109,7 @@ func TestRefresh(t *testing.T) { ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - TestingRequestFilter: kvserverbase.ReplicaRequestFilter(st.requestFilter), + TestingRequestFilter: st.requestFilter, }, }, }, @@ -118,7 +117,7 @@ func TestRefresh(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) @@ -225,8 +224,7 @@ func TestStart(t *testing.T) { setup := func() (*testcluster.TestCluster, *ptcache.Cache) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) s := tc.Server(0) - p := ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)) + p := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ @@ -259,7 +257,7 @@ func TestQueryRecord(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ @@ -316,7 +314,7 @@ func TestIterate(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) @@ -381,7 +379,7 @@ func TestSettingChangedLeadsToFetch(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) diff --git a/pkg/kv/kvserver/protectedts/ptprovider/provider.go b/pkg/kv/kvserver/protectedts/ptprovider/provider.go index 3137b2ad3412..eb380af1a007 100644 --- a/pkg/kv/kvserver/protectedts/ptprovider/provider.go +++ b/pkg/kv/kvserver/protectedts/ptprovider/provider.go @@ -35,6 +35,7 @@ type Config struct { Stores *kvserver.Stores ReconcileStatusFuncs ptreconcile.StatusFuncs InternalExecutor sqlutil.InternalExecutor + Knobs *protectedts.TestingKnobs } type provider struct { @@ -48,7 +49,7 @@ func New(cfg Config) (protectedts.Provider, error) { if err := validateConfig(cfg); err != nil { return nil, err } - storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor) + storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor, cfg.Knobs) verifier := ptverifier.New(cfg.DB, storage) return &provider{ Storage: storage, diff --git a/pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel index 69ddc318484b..a2f34e0febe2 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel @@ -13,6 +13,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/kv", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", @@ -48,12 +49,15 @@ go_test( "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", + "//pkg/sql/catalog/descpb", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", + "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", diff --git a/pkg/kv/kvserver/protectedts/ptstorage/sql.go b/pkg/kv/kvserver/protectedts/ptstorage/sql.go index c07bed5d85e6..72c54d7c635f 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/sql.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/sql.go @@ -35,6 +35,26 @@ WITH checks AS (` + protectChecksCTE + `), updated_meta AS (` + protectUpsertMetaCTE + `), new_record AS (` + protectInsertRecordCTE + `) +SELECT + failed, + total_bytes AS prev_total_bytes, + version AS prev_version +FROM + checks, current_meta;` + + // The `target` column was added to `system.protected_ts_records` as part of + // the tenant migration `AlterSystemProtectedTimestampAddColumn` necessitating + // queries that write to the table with and without the column. In a + // mixed-version state (prior to the migration) we use the queries without tha + // target column. + // + // TODO(adityamaru): Delete this in 22.2. + protectQueryWithoutTarget = ` +WITH + current_meta AS (` + currentMetaCTE + `), + checks AS (` + protectChecksCTE + `), + updated_meta AS (` + protectUpsertMetaCTE + `), + new_record AS (` + protectInsertRecordWithoutTargetCTE + `) SELECT failed, num_spans AS prev_spans, @@ -84,6 +104,20 @@ RETURNING protectInsertRecordCTE = ` INSERT +INTO + system.protected_ts_records (id, ts, meta_type, meta, num_spans, spans, target) +( + SELECT + $4, $5, $6, $7, $8, $9, $10 + WHERE + NOT EXISTS(SELECT * FROM checks WHERE failed) +) +RETURNING + id +` + + protectInsertRecordWithoutTargetCTE = ` +INSERT INTO system.protected_ts_records (id, ts, meta_type, meta, num_spans, spans) ( @@ -96,12 +130,30 @@ RETURNING id ` - getRecordsQueryBase = ` + // The `target` column was added to `system.protected_ts_records` as part of + // the tenant migration `AlterSystemProtectedTimestampAddColumn` necessitating + // queries that read from the table with and without the column. In a + // mixed-version state (prior to the migration) we use the queries without tha + // target column. + // + // TODO(adityamaru): Delete this in 22.2. + getRecordsWithoutTargetQueryBase = ` SELECT id, ts, meta_type, meta, spans, verified FROM system.protected_ts_records` + getRecordsWithoutTargetQuery = getRecordsWithoutTargetQueryBase + ";" + getRecordWithoutTargetQuery = getRecordsWithoutTargetQueryBase + ` +WHERE + id = $1;` + + getRecordsQueryBase = ` +SELECT + id, ts, meta_type, meta, spans, verified, target +FROM + system.protected_ts_records` + getRecordsQuery = getRecordsQueryBase + ";" getRecordQuery = getRecordsQueryBase + ` WHERE diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage.go b/pkg/kv/kvserver/protectedts/ptstorage/storage.go index 1115801531a2..148211b47b1b 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage.go @@ -14,6 +14,7 @@ package ptstorage import ( "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" @@ -45,13 +46,27 @@ import ( type storage struct { settings *cluster.Settings ex sqlutil.InternalExecutor + + knobs *protectedts.TestingKnobs } var _ protectedts.Storage = (*storage)(nil) +func useDeprecatedProtectedTSStorage( + ctx context.Context, st *cluster.Settings, knobs *protectedts.TestingKnobs, +) bool { + return !st.Version.IsActive(ctx, clusterversion.AlterSystemProtectedTimestampAddColumn) || + !knobs.EnableProtectedTimestampForMultiTenant +} + // New creates a new Storage. -func New(settings *cluster.Settings, ex sqlutil.InternalExecutor) protectedts.Storage { - return &storage{settings: settings, ex: ex} +func New( + settings *cluster.Settings, ex sqlutil.InternalExecutor, knobs *protectedts.TestingKnobs, +) protectedts.Storage { + if knobs == nil { + knobs = &protectedts.TestingKnobs{} + } + return &storage{settings: settings, ex: ex, knobs: knobs} } var errNoTxn = errors.New("must provide a non-nil transaction") @@ -71,17 +86,64 @@ func (p *storage) UpdateTimestamp( return nil } +func (p *storage) deprecatedProtect( + ctx context.Context, txn *kv.Txn, r *ptpb.Record, meta []byte, +) error { + s := makeSettings(p.settings) + encodedSpans, err := protoutil.Marshal(&Spans{Spans: r.DeprecatedSpans}) + if err != nil { // how can this possibly fail? + return errors.Wrap(err, "failed to marshal spans") + } + it, err := p.ex.QueryIteratorEx(ctx, "protectedts-deprecated-protect", txn, + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + protectQueryWithoutTarget, + s.maxSpans, s.maxBytes, len(r.DeprecatedSpans), + r.ID, r.Timestamp.AsOfSystemTime(), + r.MetaType, meta, + len(r.DeprecatedSpans), encodedSpans) + if err != nil { + return errors.Wrapf(err, "failed to write record %v", r.ID) + } + ok, err := it.Next(ctx) + if err != nil { + return errors.Wrapf(err, "failed to write record %v", r.ID) + } + if !ok { + return errors.Newf("failed to write record %v", r.ID) + } + row := it.Cur() + if err := it.Close(); err != nil { + log.Infof(ctx, "encountered %v when writing record %v", err, r.ID) + } + if failed := *row[0].(*tree.DBool); failed { + curNumSpans := int64(*row[1].(*tree.DInt)) + if s.maxSpans > 0 && curNumSpans+int64(len(r.DeprecatedSpans)) > s.maxSpans { + return errors.WithHint( + errors.Errorf("protectedts: limit exceeded: %d+%d > %d spans", curNumSpans, + len(r.DeprecatedSpans), s.maxSpans), + "SET CLUSTER SETTING kv.protectedts.max_spans to a higher value") + } + curBytes := int64(*row[2].(*tree.DInt)) + recordBytes := int64(len(encodedSpans) + len(r.Meta) + len(r.MetaType)) + if s.maxBytes > 0 && curBytes+recordBytes > s.maxBytes { + return errors.WithHint( + errors.Errorf("protectedts: limit exceeded: %d+%d > %d bytes", curBytes, recordBytes, + s.maxBytes), + "SET CLUSTER SETTING kv.protectedts.max_bytes to a higher value") + } + return protectedts.ErrExists + } + return nil +} + func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) error { - if err := validateRecordForProtect(r); err != nil { + if err := validateRecordForProtect(ctx, r, p.settings, p.knobs); err != nil { return err } if txn == nil { return errNoTxn } - encodedSpans, err := protoutil.Marshal(&Spans{Spans: r.DeprecatedSpans}) - if err != nil { // how can this possibly fail? - return errors.Wrap(err, "failed to marshal spans") - } + meta := r.Meta if meta == nil { // v20.1 crashes in rowToRecord and storage.Release if it finds a NULL @@ -91,14 +153,33 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro // TODO(nvanbenschoten): remove this for v21.1. meta = []byte{} } + + // The `target` column was added to `system.protected_ts_records` as part of + // the tenant migration `AlterSystemProtectedTimestampAddColumn`. Prior to the + // migration we should continue write records that protect `spans`. + // + // TODO(adityamaru): Delete in 22.2 once we exclusively protect `target`s. + if useDeprecatedProtectedTSStorage(ctx, p.settings, p.knobs) { + return p.deprecatedProtect(ctx, txn, r, meta) + } + + // Clear the `DeprecatedSpans` field even if it has been set by the caller. + // Once the `AlterSystemProtectedTimestampAddColumn` migration has run, we + // only want to persist the `target` on which the pts record applies. We have + // already verified that the record has a valid `target`. + r.DeprecatedSpans = nil s := makeSettings(p.settings) + encodedTarget, err := protoutil.Marshal(&ptpb.Target{Union: r.Target.GetUnion()}) + if err != nil { // how can this possibly fail? + return errors.Wrap(err, "failed to marshal spans") + } it, err := p.ex.QueryIteratorEx(ctx, "protectedts-protect", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, protectQuery, s.maxSpans, s.maxBytes, len(r.DeprecatedSpans), r.ID, r.Timestamp.AsOfSystemTime(), r.MetaType, meta, - len(r.DeprecatedSpans), encodedSpans) + len(r.DeprecatedSpans), encodedTarget, encodedTarget) if err != nil { return errors.Wrapf(err, "failed to write record %v", r.ID) } @@ -114,15 +195,8 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro log.Infof(ctx, "encountered %v when writing record %v", err, r.ID) } if failed := *row[0].(*tree.DBool); failed { - curNumSpans := int64(*row[1].(*tree.DInt)) - if s.maxSpans > 0 && curNumSpans+int64(len(r.DeprecatedSpans)) > s.maxSpans { - return errors.WithHint( - errors.Errorf("protectedts: limit exceeded: %d+%d > %d spans", curNumSpans, - len(r.DeprecatedSpans), s.maxSpans), - "SET CLUSTER SETTING kv.protectedts.max_spans to a higher value") - } - curBytes := int64(*row[2].(*tree.DInt)) - recordBytes := int64(len(encodedSpans) + len(r.Meta) + len(r.MetaType)) + curBytes := int64(*row[1].(*tree.DInt)) + recordBytes := int64(len(encodedTarget) + len(r.Meta) + len(r.MetaType)) if s.maxBytes > 0 && curBytes+recordBytes > s.maxBytes { return errors.WithHint( errors.Errorf("protectedts: limit exceeded: %d+%d > %d bytes", curBytes, recordBytes, @@ -131,13 +205,43 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro } return protectedts.ErrExists } + return nil } +func (p *storage) deprecatedGetRecord( + ctx context.Context, txn *kv.Txn, id uuid.UUID, +) (*ptpb.Record, error) { + row, err := p.ex.QueryRowEx(ctx, "protectedts-deprecated-GetRecord", txn, + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + getRecordWithoutTargetQuery, id.GetBytesMut()) + if err != nil { + return nil, errors.Wrapf(err, "failed to read record %v", id) + } + if len(row) == 0 { + return nil, protectedts.ErrNotExists + } + var r ptpb.Record + if err := rowToRecord(ctx, row, &r, p.settings, p.knobs); err != nil { + return nil, err + } + return &r, nil +} + func (p *storage) GetRecord(ctx context.Context, txn *kv.Txn, id uuid.UUID) (*ptpb.Record, error) { if txn == nil { return nil, errNoTxn } + + // The `target` column was added to `system.protected_ts_records` as part of + // the tenant migration `AlterSystemProtectedTimestampAddColumn`. Prior to the + // migration we should continue return records that protect `spans`. + // + // TODO(adityamaru): Delete in 22.2 once we exclusively protect `target`s. + if useDeprecatedProtectedTSStorage(ctx, p.settings, p.knobs) { + return p.deprecatedGetRecord(ctx, txn, id) + } + row, err := p.ex.QueryRowEx(ctx, "protectedts-GetRecord", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, getRecordQuery, id.GetBytesMut()) @@ -148,7 +252,7 @@ func (p *storage) GetRecord(ctx context.Context, txn *kv.Txn, id uuid.UUID) (*pt return nil, protectedts.ErrNotExists } var r ptpb.Record - if err := rowToRecord(ctx, row, &r); err != nil { + if err := rowToRecord(ctx, row, &r, p.settings, p.knobs); err != nil { return nil, err } return &r, nil @@ -225,18 +329,45 @@ func (p *storage) GetState(ctx context.Context, txn *kv.Txn) (ptpb.State, error) }, nil } +func (p *storage) deprecatedGetRecords(ctx context.Context, txn *kv.Txn) ([]ptpb.Record, error) { + it, err := p.ex.QueryIteratorEx(ctx, "protectedts-deprecated-GetRecords", txn, + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + getRecordsWithoutTargetQuery) + if err != nil { + return nil, errors.Wrap(err, "failed to read records") + } + + var ok bool + var records []ptpb.Record + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + var record ptpb.Record + if err := rowToRecord(ctx, it.Cur(), &record, p.settings, p.knobs); err != nil { + log.Errorf(ctx, "failed to parse row as record: %v", err) + } + records = append(records, record) + } + if err != nil { + return nil, errors.Wrap(err, "failed to read records") + } + return records, nil +} + func (p *storage) getRecords(ctx context.Context, txn *kv.Txn) ([]ptpb.Record, error) { + if useDeprecatedProtectedTSStorage(ctx, p.settings, p.knobs) { + return p.deprecatedGetRecords(ctx, txn) + } + it, err := p.ex.QueryIteratorEx(ctx, "protectedts-GetRecords", txn, - sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, - getRecordsQuery) + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, getRecordsQuery) if err != nil { return nil, errors.Wrap(err, "failed to read records") } + var ok bool var records []ptpb.Record for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { var record ptpb.Record - if err := rowToRecord(ctx, it.Cur(), &record); err != nil { + if err := rowToRecord(ctx, it.Cur(), &record, p.settings, p.knobs); err != nil { log.Errorf(ctx, "failed to parse row as record: %v", err) } records = append(records, record) @@ -252,7 +383,13 @@ func (p *storage) getRecords(ctx context.Context, txn *kv.Txn) ([]ptpb.Record, e // they are logged but not returned. Returning an error due to malformed data // in the protected timestamp subsystem would create more problems than it would // solve. Malformed records can still be removed (and hopefully will be). -func rowToRecord(ctx context.Context, row tree.Datums, r *ptpb.Record) error { +func rowToRecord( + ctx context.Context, + row tree.Datums, + r *ptpb.Record, + st *cluster.Settings, + knobs *protectedts.TestingKnobs, +) error { r.ID = row[0].(*tree.DUuid).UUID.GetBytes() tsDecimal := row[1].(*tree.DDecimal) ts, err := tree.DecimalToHLC(&tsDecimal.Decimal) @@ -269,10 +406,18 @@ func rowToRecord(ctx context.Context, row tree.Datums, r *ptpb.Record) error { } var spans Spans if err := protoutil.Unmarshal([]byte(*row[4].(*tree.DBytes)), &spans); err != nil { - return errors.Wrapf(err, "failed to unmarshal spans for %v", r.ID) + return errors.Wrapf(err, "failed to unmarshal span for %v", r.ID) } r.DeprecatedSpans = spans.Spans r.Verified = bool(*row[5].(*tree.DBool)) + + if !useDeprecatedProtectedTSStorage(ctx, st, knobs) { + target := &ptpb.Target{} + if err := protoutil.Unmarshal([]byte(*row[6].(*tree.DBytes)), target); err != nil { + return errors.Wrapf(err, "failed to unmarshal target for %v", r.ID) + } + r.Target = target + } return nil } @@ -292,18 +437,25 @@ var ( errZeroTimestamp = errors.New("invalid zero value timestamp") errZeroID = errors.New("invalid zero value ID") errEmptySpans = errors.Errorf("invalid empty set of spans") + errNilTarget = errors.Errorf("invalid nil target") errInvalidMeta = errors.Errorf("invalid Meta with empty MetaType") errCreateVerifiedRecord = errors.Errorf("cannot create a verified record") ) -func validateRecordForProtect(r *ptpb.Record) error { +func validateRecordForProtect( + ctx context.Context, r *ptpb.Record, st *cluster.Settings, knobs *protectedts.TestingKnobs, +) error { if r.Timestamp.IsEmpty() { return errZeroTimestamp } if r.ID.GetUUID() == uuid.Nil { return errZeroID } - if len(r.DeprecatedSpans) == 0 { + useDeprecatedPTSStorage := useDeprecatedProtectedTSStorage(ctx, st, knobs) + if !useDeprecatedPTSStorage && r.Target == nil { + return errNilTarget + } + if useDeprecatedPTSStorage && len(r.DeprecatedSpans) == 0 { return errEmptySpans } if len(r.Meta) > 0 && len(r.MetaType) == 0 { diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index e7e9a576359e..e1c8a1d7c0da 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -20,6 +20,7 @@ import ( "sort" "strconv" "testing" + "unsafe" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -32,10 +33,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -48,8 +52,15 @@ import ( ) func TestStorage(t *testing.T) { - for _, test := range testCases { - t.Run(test.name, test.run) + for _, withDeprecatedSpans := range []bool{true, false} { + for _, test := range testCases { + name := test.name + if withDeprecatedSpans { + name = fmt.Sprintf("%s_withDeprecatedSpans", name) + test.runWithDeprecatedSpans = true + } + t.Run(name, test.run) + } } } @@ -57,14 +68,17 @@ var testCases = []testCase{ { name: "Protect - simple positive", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{ + target: tableTarget(42), + spans: tableSpans(42), + }, }, }, { - name: "Protect - no spans", + name: "Protect - no targets", ops: []op{ protectOp{ - expErr: "invalid empty set of spans", + expErr: "invalid (nil target|empty set of spans)", }, }, }, @@ -72,7 +86,7 @@ var testCases = []testCase{ name: "Protect - zero timestamp", ops: []op{ funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { - rec := newRecord(hlc.Timestamp{}, "", nil, tableSpan(42)) + rec := newRecord(tCtx, hlc.Timestamp{}, "", nil, tableTarget(42), tableSpan(42)) err := tCtx.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return tCtx.pts.Protect(ctx, txn, &rec) }) @@ -84,7 +98,8 @@ var testCases = []testCase{ name: "Protect - already verified", ops: []op{ funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { - rec := newRecord(tCtx.tc.Server(0).Clock().Now(), "", nil, tableSpan(42)) + rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now(), "", nil, tableTarget(42), + tableSpan(42)) rec.Verified = true err := tCtx.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return tCtx.pts.Protect(ctx, txn, &rec) @@ -96,7 +111,7 @@ var testCases = []testCase{ { name: "Protect - already exists", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{spans: tableSpans(42), target: tableTarget(42)}, funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { // When max_bytes or max_spans is set to 0 (i.e. unlimited), and a // protect op fails because the record already exists, we should report @@ -108,7 +123,7 @@ var testCases = []testCase{ require.NoError(t, err) }), funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { - rec := newRecord(tCtx.tc.Server(0).Clock().Now(), "", nil, tableSpan(42)) + rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now(), "", nil, tableTarget(42), tableSpan(42)) rec.ID = pickOneRecord(tCtx).GetBytes() err := tCtx.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return tCtx.pts.Protect(ctx, txn, &rec) @@ -146,11 +161,12 @@ var testCases = []testCase{ expErr: "protectedts: limit exceeded: 3\\+1 > 3 spans", }, }, + runWithDeprecatedSpans: true, }, { name: "Protect - too many bytes", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{spans: tableSpans(42), target: tableTarget(42)}, funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { _, err := tCtx.tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.protectedts.max_bytes = $1", 1024) require.NoError(t, err) @@ -162,17 +178,19 @@ var testCases = []testCase{ s.EndKey = append(s.EndKey, bytes.Repeat([]byte{'a'}, 1024)...) return s }()), - expErr: "protectedts: limit exceeded: 8\\+1050 > 1024 bytes", + target: largeTableTarget(1024), + expErr: "protectedts: limit exceeded: .* bytes", }, protectOp{ - spans: tableSpans(1, 2), + spans: tableSpans(1, 2), + target: tableTargets(1, 2), }, }, }, { name: "Protect - unlimited bytes", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{spans: tableSpans(42), target: tableTarget(42)}, funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { _, err := tCtx.tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.protectedts.max_bytes = $1", 0) require.NoError(t, err) @@ -184,9 +202,11 @@ var testCases = []testCase{ s.EndKey = append(s.EndKey, bytes.Repeat([]byte{'a'}, 2<<20 /* 2 MiB */)...) return s }()), + target: largeTableTarget(2 << 20 /* 2 MiB */), }, protectOp{ - spans: tableSpans(1, 2), + spans: tableSpans(1, 2), + target: tableTargets(1, 2), }, }, }, @@ -212,6 +232,7 @@ var testCases = []testCase{ spans: tableSpans(1, 2), }, }, + runWithDeprecatedSpans: true, }, { name: "GetRecord - does not exist", @@ -230,7 +251,7 @@ var testCases = []testCase{ { name: "MarkVerified", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{target: tableTarget(42), spans: tableSpans(42)}, markVerifiedOp{idFunc: pickOneRecord}, markVerifiedOp{idFunc: pickOneRecord}, // it's idempotent markVerifiedOp{ @@ -242,7 +263,7 @@ var testCases = []testCase{ { name: "Release", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{target: tableTarget(42), spans: tableSpans(42)}, releaseOp{idFunc: pickOneRecord}, releaseOp{ idFunc: randomID, @@ -253,7 +274,7 @@ var testCases = []testCase{ { name: "UpdateTimestamp", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{spans: tableSpans(42), target: tableTarget(42)}, updateTimestampOp{ expectedRecordFn: func(record ptpb.Record) ptpb.Record { record.Timestamp = hlc.Timestamp{WallTime: 1} @@ -278,7 +299,7 @@ var testCases = []testCase{ name: "nil transaction errors", ops: []op{ funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { - rec := newRecord(tCtx.tc.Server(0).Clock().Now(), "", nil, tableSpan(42)) + rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now(), "", nil, tableTarget(42), tableSpan(42)) const msg = "must provide a non-nil transaction" require.Regexp(t, msg, tCtx.pts.Protect(ctx, nil /* txn */, &rec).Error()) require.Regexp(t, msg, tCtx.pts.Release(ctx, nil /* txn */, uuid.MakeV4()).Error()) @@ -299,6 +320,11 @@ type testContext struct { tc *testcluster.TestCluster db *kv.DB + // If set to false, the test will be run with + // `EnableProtectedTimestampForMultiTenant` set to true, thereby testing the + // "new" protected timestamp logic that runs on targets instead of spans. + runWithDeprecatedSpans bool + state ptpb.State } @@ -337,8 +363,14 @@ func (r releaseOp) run(ctx context.Context, t *testing.T, tCtx *testContext) { tCtx.state.Version++ tCtx.state.NumRecords-- tCtx.state.NumSpans -= uint64(len(rec.DeprecatedSpans)) - encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans}) - require.NoError(t, err) + var encoded []byte + if tCtx.runWithDeprecatedSpans { + encoded, err = protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans}) + require.NoError(t, err) + } else { + encoded, err = protoutil.Marshal(&ptpb.Target{Union: rec.Target.GetUnion()}) + require.NoError(t, err) + } tCtx.state.TotalBytes -= uint64(len(encoded) + len(rec.Meta) + len(rec.MetaType)) } } @@ -369,11 +401,12 @@ type protectOp struct { metaType string meta []byte spans []roachpb.Span + target *ptpb.Target expErr string } func (p protectOp) run(ctx context.Context, t *testing.T, tCtx *testContext) { - rec := newRecord(tCtx.tc.Server(0).Clock().Now(), p.metaType, p.meta, p.spans...) + rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now(), p.metaType, p.meta, p.target, p.spans...) if p.idFunc != nil { rec.ID = p.idFunc(tCtx).GetBytes() } @@ -393,8 +426,14 @@ func (p protectOp) run(ctx context.Context, t *testing.T, tCtx *testContext) { tCtx.state.Version++ tCtx.state.NumRecords++ tCtx.state.NumSpans += uint64(len(rec.DeprecatedSpans)) - encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: p.spans}) - require.NoError(t, err) + var encoded []byte + if tCtx.runWithDeprecatedSpans { + encoded, err = protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans}) + require.NoError(t, err) + } else { + encoded, err = protoutil.Marshal(&ptpb.Target{Union: rec.Target.GetUnion()}) + require.NoError(t, err) + } tCtx.state.TotalBytes += uint64(len(encoded) + len(p.meta) + len(p.metaType)) } } @@ -423,23 +462,31 @@ func (p updateTimestampOp) run(ctx context.Context, t *testing.T, tCtx *testCont } type testCase struct { - name string - ops []op + name string + ops []op + runWithDeprecatedSpans bool } func (test testCase) run(t *testing.T) { ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + var params base.TestServerArgs + + ptsKnobs := &protectedts.TestingKnobs{} + if !test.runWithDeprecatedSpans { + ptsKnobs.EnableProtectedTimestampForMultiTenant = true + params.Knobs.ProtectedTS = ptsKnobs + } + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) - pts := ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(*sql.InternalExecutor)) + pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), ptsKnobs) db := s.DB() tCtx := testContext{ - pts: pts, - db: db, - tc: tc, + pts: pts, + db: db, + tc: tc, + runWithDeprecatedSpans: test.runWithDeprecatedSpans, } verify := func(t *testing.T) { var state ptpb.State @@ -486,6 +533,28 @@ func pickOneRecord(tCtx *testContext) uuid.UUID { return tCtx.state.Records[rand.Intn(numRecords)].ID.GetUUID() } +func tableTargets(ids ...uint32) *ptpb.Target { + var tableIDs []descpb.ID + for _, id := range ids { + tableIDs = append(tableIDs, descpb.ID(id)) + } + return ptpb.MakeSchemaObjectsTarget(tableIDs) +} + +func tableTarget(tableID uint32) *ptpb.Target { + return ptpb.MakeSchemaObjectsTarget([]descpb.ID{descpb.ID(tableID)}) +} + +func largeTableTarget(targetBytesSize int64) *ptpb.Target { + var tableID descpb.ID + idSize := int64(unsafe.Sizeof(tableID)) + ids := make([]descpb.ID, 0) + for i := int64(0); i < targetBytesSize/idSize; i++ { + ids = append(ids, descpb.ID(rand.Uint32())) + } + return ptpb.MakeSchemaObjectsTarget(ids) +} + func tableSpan(tableID uint32) roachpb.Span { return roachpb.Span{ Key: keys.SystemSQLCodec.TablePrefix(tableID), @@ -501,7 +570,19 @@ func tableSpans(tableIDs ...uint32) []roachpb.Span { return spans } -func newRecord(ts hlc.Timestamp, metaType string, meta []byte, spans ...roachpb.Span) ptpb.Record { +func newRecord( + tCtx *testContext, + ts hlc.Timestamp, + metaType string, + meta []byte, + target *ptpb.Target, + spans ...roachpb.Span, +) ptpb.Record { + if tCtx.runWithDeprecatedSpans { + target = nil + } else { + spans = nil + } return ptpb.Record{ ID: uuid.MakeV4().GetBytes(), Timestamp: ts, @@ -509,6 +590,7 @@ func newRecord(ts hlc.Timestamp, metaType string, meta []byte, spans ...roachpb. MetaType: metaType, Meta: meta, DeprecatedSpans: spans, + Target: target, } } @@ -531,33 +613,27 @@ func newRecord(ts hlc.Timestamp, metaType string, meta []byte, spans ...roachpb. func TestCorruptData(t *testing.T) { ctx := context.Background() - t.Run("corrupt spans", func(t *testing.T) { - // Set the log scope so we can introspect the logged errors. - scope := log.Scope(t) - defer scope.Close(t) - - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) - defer tc.Stopper().Stop(ctx) - - s := tc.Server(0) - pts := ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(*sql.InternalExecutor)) - - rec := newRecord(s.Clock().Now(), "foo", []byte("bar"), tableSpan(42)) + runCorruptDataTest := func(tCtx *testContext, s serverutils.TestServerInterface, + tc *testcluster.TestCluster, pts protectedts.Storage) { + rec := newRecord(tCtx, s.Clock().Now(), "foo", []byte("bar"), tableTarget(42), tableSpan(42)) require.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return pts.Protect(ctx, txn, &rec) })) ie := tc.Server(0).InternalExecutor().(sqlutil.InternalExecutor) + updateQuery := "UPDATE system.protected_ts_records SET target = $1 WHERE id = $2" + if tCtx.runWithDeprecatedSpans { + updateQuery = "UPDATE system.protected_ts_records SET spans = $1 WHERE id = $2" + } affected, err := ie.ExecEx( ctx, "corrupt-data", nil, /* txn */ sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, - "UPDATE system.protected_ts_records SET spans = $1 WHERE id = $2", + updateQuery, []byte("junk"), rec.ID.String()) require.NoError(t, err) require.Equal(t, 1, affected) var got *ptpb.Record - msg := regexp.MustCompile("failed to unmarshal spans for " + rec.ID.String() + ": ") + msg := regexp.MustCompile("failed to unmarshal (span|target) for " + rec.ID.String() + ": ") require.Regexp(t, msg, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { got, err = pts.GetRecord(ctx, txn, rec.ID.GetUUID()) @@ -576,20 +652,55 @@ func TestCorruptData(t *testing.T) { for _, e := range entries { require.Equal(t, severity.ERROR, e.Severity) } + } + + // TODO(adityamaru): Remove test when we delete `spans` field from + // record. + t.Run("corrupt spans", func(t *testing.T) { + // Set the log scope so we can introspect the logged errors. + scope := log.Scope(t) + defer scope.Close(t) + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + s := tc.Server(0) + pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), + nil /* knobs */) + + tCtx := &testContext{runWithDeprecatedSpans: true} + runCorruptDataTest(tCtx, s, tc, pts) + }) + t.Run("corrupt target", func(t *testing.T) { + // Set the log scope so we can introspect the logged errors. + scope := log.Scope(t) + defer scope.Close(t) + + params, _ := tests.CreateTestServerParams() + ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true} + params.Knobs.ProtectedTS = ptsKnobs + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) + defer tc.Stopper().Stop(ctx) + + s := tc.Server(0) + pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), ptsKnobs) + runCorruptDataTest(&testContext{}, s, tc, pts) }) t.Run("corrupt hlc timestamp", func(t *testing.T) { // Set the log scope so we can introspect the logged errors. scope := log.Scope(t) defer scope.Close(t) - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + params, _ := tests.CreateTestServerParams() + ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true} + params.Knobs.ProtectedTS = ptsKnobs + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) - pts := ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(*sql.InternalExecutor)) + pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), ptsKnobs) - rec := newRecord(s.Clock().Now(), "foo", []byte("bar"), tableSpan(42)) + rec := newRecord(&testContext{}, s.Clock().Now(), "foo", []byte("bar"), tableTarget(42), tableSpan(42)) require.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return pts.Protect(ctx, txn, &rec) })) @@ -635,18 +746,22 @@ func TestCorruptData(t *testing.T) { // are properly transmitted back to the client. func TestErrorsFromSQL(t *testing.T) { ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + params, _ := tests.CreateTestServerParams() + ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true} + params.Knobs.ProtectedTS = ptsKnobs + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) ie := s.InternalExecutor().(sqlutil.InternalExecutor) wrappedIE := &wrappedInternalExecutor{wrapped: ie} - pts := ptstorage.New(s.ClusterSettings(), wrappedIE) + pts := ptstorage.New(s.ClusterSettings(), wrappedIE, ptsKnobs) wrappedIE.setErrFunc(func(string) error { return errors.New("boom") }) - rec := newRecord(s.Clock().Now(), "foo", []byte("bar"), tableSpan(42)) + rec := newRecord(&testContext{}, s.Clock().Now(), "foo", []byte("bar"), tableTarget(42), tableSpan(42)) require.EqualError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return pts.Protect(ctx, txn, &rec) }), fmt.Sprintf("failed to write record %v: boom", rec.ID)) diff --git a/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go b/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go index 99a5a296d003..97f20918969d 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go @@ -11,61 +11,58 @@ package ptstorage import ( + "context" "strconv" "testing" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" - roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) func TestValidateRecordForProtect(t *testing.T) { - spans := []roachpb.Span{ - { - Key: roachpb.Key("a"), - EndKey: roachpb.Key("b"), - }, - } + target := ptpb.MakeClusterTarget() for i, tc := range []struct { r *ptpb.Record err error }{ { r: &ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, - MetaType: "job", - Meta: []byte("junk"), - DeprecatedSpans: spans, + ID: uuid.MakeV4().GetBytes(), + Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, + MetaType: "job", + Meta: []byte("junk"), + Target: target, }, err: nil, }, { r: &ptpb.Record{ - Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, - MetaType: "job", - Meta: []byte("junk"), - DeprecatedSpans: spans, + Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, + MetaType: "job", + Meta: []byte("junk"), + Target: target, }, err: errZeroID, }, { r: &ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - MetaType: "job", - Meta: []byte("junk"), - DeprecatedSpans: spans, + ID: uuid.MakeV4().GetBytes(), + MetaType: "job", + Meta: []byte("junk"), + Target: target, }, err: errZeroTimestamp, }, { r: &ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, - Meta: []byte("junk"), - DeprecatedSpans: spans, + ID: uuid.MakeV4().GetBytes(), + Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, + Meta: []byte("junk"), + Target: target, }, err: errInvalidMeta, }, @@ -76,11 +73,28 @@ func TestValidateRecordForProtect(t *testing.T) { MetaType: "job", Meta: []byte("junk"), }, - err: errEmptySpans, + err: errNilTarget, }, } { t.Run(strconv.Itoa(i), func(t *testing.T) { - require.Equal(t, validateRecordForProtect(tc.r), tc.err) + st := cluster.MakeTestingClusterSettings() + require.Equal(t, validateRecordForProtect(context.Background(), tc.r, st, + &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true}), tc.err) + }) + + // Test that prior to the `AlterSystemProtectedTimestampAddColumn` migration + // we validate that records have a non-nil `Spans` field. + t.Run("errEmptySpans", func(t *testing.T) { + r := &ptpb.Record{ + ID: uuid.MakeV4().GetBytes(), + Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, + MetaType: "job", + Meta: []byte("junk"), + Target: target, + } + st := cluster.MakeTestingClusterSettings() + require.Equal(t, validateRecordForProtect(context.Background(), r, st, + &protectedts.TestingKnobs{}), errEmptySpans) }) } } diff --git a/pkg/kv/kvserver/protectedts/ptverifier/verifier.go b/pkg/kv/kvserver/protectedts/ptverifier/verifier.go index 26f7d5af6679..c3f46349c80d 100644 --- a/pkg/kv/kvserver/protectedts/ptverifier/verifier.go +++ b/pkg/kv/kvserver/protectedts/ptverifier/verifier.go @@ -45,6 +45,13 @@ func (v *verifier) Verify(ctx context.Context, id uuid.UUID) error { return errors.Wrapf(err, "failed to fetch record %s", id) } + // TODO(adityamaru): Remove this once we delete all `Verify` calls. The new + // subsystem is not going to provide verification semantics. Until then mark + // the record as verified so it is a noop. + if r.DeprecatedSpans == nil { + return nil + } + if r.Verified { // already verified return nil } diff --git a/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go b/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go index 311abbe1032a..de87dfbd5339 100644 --- a/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go +++ b/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go @@ -63,7 +63,7 @@ func TestVerifier(t *testing.T) { }), ) - pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor)) + pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */) withDB := ptstorage.WithDatabase(pts, s.DB()) db := kv.NewDB(s.DB().AmbientContext, tsf, s.Clock(), s.Stopper()) ptv := ptverifier.New(db, pts) diff --git a/pkg/kv/kvserver/protectedts/testing_knobs.go b/pkg/kv/kvserver/protectedts/testing_knobs.go new file mode 100644 index 000000000000..5a57770f930b --- /dev/null +++ b/pkg/kv/kvserver/protectedts/testing_knobs.go @@ -0,0 +1,29 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package protectedts + +import "github.com/cockroachdb/cockroach/pkg/base" + +// TestingKnobs provide fine-grained control over the various span config +// components for testing. +type TestingKnobs struct { + // EnableProtectedTimestampForMultiTenant when set to true, runs the protected + // timestamp subsystem that depends on span configuration reconciliation. + // + // TODO(adityamaru,arulajmani): Default this to true, or flip it to + // `DisableProtectedTimestampForMultiTenant` prior to release 22.1. + EnableProtectedTimestampForMultiTenant bool +} + +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (t *TestingKnobs) ModuleTestingKnobs() {} + +var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil) diff --git a/pkg/server/server.go b/pkg/server/server.go index 66da809a3230..afaf015edc87 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -580,10 +580,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { user security.SQLUsername) (cloud.ExternalStorage, error) { return externalStorageBuilder.makeExternalStorageFromURI(ctx, uri, user) } + + protectedtsKnobs, _ := cfg.TestingKnobs.ProtectedTS.(*protectedts.TestingKnobs) protectedtsProvider, err := ptprovider.New(ptprovider.Config{ DB: db, InternalExecutor: internalExecutor, Settings: st, + Knobs: protectedtsKnobs, }) if err != nil { return nil, err diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 06cc4b4446b2..c74b18a3913b 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -450,10 +450,12 @@ func makeTenantSQLServerArgs( // clusters. var protectedTSProvider protectedts.Provider { + protectedtsKnobs, _ := baseCfg.TestingKnobs.ProtectedTS.(*protectedts.TestingKnobs) pp, err := ptprovider.New(ptprovider.Config{ DB: db, InternalExecutor: circularInternalExecutor, Settings: st, + Knobs: protectedtsKnobs, }) if err != nil { panic(err)