diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index c63bf805a64c..2ad14fd11388 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -45,4 +45,5 @@ type TestingKnobs struct { SQLLivenessKnobs ModuleTestingKnobs TelemetryLoggingKnobs ModuleTestingKnobs DialerKnobs ModuleTestingKnobs + ProtectedTS ModuleTestingKnobs } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 79ef1abbec33..17ab58ef81c8 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -6747,6 +6747,8 @@ func TestRestoreErrorPropagates(t *testing.T) { // TestProtectedTimestampsFailDueToLimits ensures that when creating a protected // timestamp record fails, we return the correct error. +// +// TODO(adityamaru): Remove in 22.2 once no records protect spans. func TestProtectedTimestampsFailDueToLimits(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/client_protectedts_test.go b/pkg/kv/kvserver/client_protectedts_test.go index 7b9fd21bf99b..fa1a0867078d 100644 --- a/pkg/kv/kvserver/client_protectedts_test.go +++ b/pkg/kv/kvserver/client_protectedts_test.go @@ -137,7 +137,8 @@ func TestProtectedTimestamps(t *testing.T) { beforeWrites := s0.Clock().Now() gcSoon() - pts := ptstorage.New(s0.ClusterSettings(), s0.InternalExecutor().(*sql.InternalExecutor)) + pts := ptstorage.New(s0.ClusterSettings(), s0.InternalExecutor().(*sql.InternalExecutor), + nil /* knobs */) ptsWithDB := ptstorage.WithDatabase(pts, s0.DB()) startKey := getTableStartKey("foo") ptsRec := ptpb.Record{ diff --git a/pkg/kv/kvserver/protectedts/BUILD.bazel b/pkg/kv/kvserver/protectedts/BUILD.bazel index 333aa226ac05..86b208f9f7f5 100644 --- a/pkg/kv/kvserver/protectedts/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/BUILD.bazel @@ -5,10 +5,12 @@ go_library( srcs = [ "protectedts.go", "settings.go", + "testing_knobs.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/kv", "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb:with-mocks", diff --git a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel index 37ee963c94c8..dfc9e33c5639 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel @@ -34,7 +34,6 @@ go_test( "//pkg/base", "//pkg/keys", "//pkg/kv/kvserver", - "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/kv/kvserver/protectedts/ptstorage", diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go index 3e173e3e4826..cd251dd372b6 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptcache" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" @@ -48,8 +47,8 @@ func TestCacheBasic(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) - p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) + p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), + nil /* knobs */), s.DB()) // Set the poll interval to be very short. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Microsecond) @@ -110,7 +109,7 @@ func TestRefresh(t *testing.T) { ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - TestingRequestFilter: kvserverbase.ReplicaRequestFilter(st.requestFilter), + TestingRequestFilter: st.requestFilter, }, }, }, @@ -118,7 +117,7 @@ func TestRefresh(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) @@ -225,8 +224,7 @@ func TestStart(t *testing.T) { setup := func() (*testcluster.TestCluster, *ptcache.Cache) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) s := tc.Server(0) - p := ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)) + p := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ @@ -259,7 +257,7 @@ func TestQueryRecord(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ @@ -316,7 +314,7 @@ func TestIterate(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) @@ -381,7 +379,7 @@ func TestSettingChangedLeadsToFetch(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) diff --git a/pkg/kv/kvserver/protectedts/ptprovider/provider.go b/pkg/kv/kvserver/protectedts/ptprovider/provider.go index 3137b2ad3412..eb380af1a007 100644 --- a/pkg/kv/kvserver/protectedts/ptprovider/provider.go +++ b/pkg/kv/kvserver/protectedts/ptprovider/provider.go @@ -35,6 +35,7 @@ type Config struct { Stores *kvserver.Stores ReconcileStatusFuncs ptreconcile.StatusFuncs InternalExecutor sqlutil.InternalExecutor + Knobs *protectedts.TestingKnobs } type provider struct { @@ -48,7 +49,7 @@ func New(cfg Config) (protectedts.Provider, error) { if err := validateConfig(cfg); err != nil { return nil, err } - storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor) + storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor, cfg.Knobs) verifier := ptverifier.New(cfg.DB, storage) return &provider{ Storage: storage, diff --git a/pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel index 69ddc318484b..a2f34e0febe2 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel @@ -13,6 +13,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/kv", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", @@ -48,12 +49,15 @@ go_test( "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", + "//pkg/sql/catalog/descpb", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", + "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", diff --git a/pkg/kv/kvserver/protectedts/ptstorage/sql.go b/pkg/kv/kvserver/protectedts/ptstorage/sql.go index c07bed5d85e6..72c54d7c635f 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/sql.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/sql.go @@ -35,6 +35,26 @@ WITH checks AS (` + protectChecksCTE + `), updated_meta AS (` + protectUpsertMetaCTE + `), new_record AS (` + protectInsertRecordCTE + `) +SELECT + failed, + total_bytes AS prev_total_bytes, + version AS prev_version +FROM + checks, current_meta;` + + // The `target` column was added to `system.protected_ts_records` as part of + // the tenant migration `AlterSystemProtectedTimestampAddColumn` necessitating + // queries that write to the table with and without the column. In a + // mixed-version state (prior to the migration) we use the queries without tha + // target column. + // + // TODO(adityamaru): Delete this in 22.2. + protectQueryWithoutTarget = ` +WITH + current_meta AS (` + currentMetaCTE + `), + checks AS (` + protectChecksCTE + `), + updated_meta AS (` + protectUpsertMetaCTE + `), + new_record AS (` + protectInsertRecordWithoutTargetCTE + `) SELECT failed, num_spans AS prev_spans, @@ -84,6 +104,20 @@ RETURNING protectInsertRecordCTE = ` INSERT +INTO + system.protected_ts_records (id, ts, meta_type, meta, num_spans, spans, target) +( + SELECT + $4, $5, $6, $7, $8, $9, $10 + WHERE + NOT EXISTS(SELECT * FROM checks WHERE failed) +) +RETURNING + id +` + + protectInsertRecordWithoutTargetCTE = ` +INSERT INTO system.protected_ts_records (id, ts, meta_type, meta, num_spans, spans) ( @@ -96,12 +130,30 @@ RETURNING id ` - getRecordsQueryBase = ` + // The `target` column was added to `system.protected_ts_records` as part of + // the tenant migration `AlterSystemProtectedTimestampAddColumn` necessitating + // queries that read from the table with and without the column. In a + // mixed-version state (prior to the migration) we use the queries without tha + // target column. + // + // TODO(adityamaru): Delete this in 22.2. + getRecordsWithoutTargetQueryBase = ` SELECT id, ts, meta_type, meta, spans, verified FROM system.protected_ts_records` + getRecordsWithoutTargetQuery = getRecordsWithoutTargetQueryBase + ";" + getRecordWithoutTargetQuery = getRecordsWithoutTargetQueryBase + ` +WHERE + id = $1;` + + getRecordsQueryBase = ` +SELECT + id, ts, meta_type, meta, spans, verified, target +FROM + system.protected_ts_records` + getRecordsQuery = getRecordsQueryBase + ";" getRecordQuery = getRecordsQueryBase + ` WHERE diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage.go b/pkg/kv/kvserver/protectedts/ptstorage/storage.go index 1115801531a2..148211b47b1b 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage.go @@ -14,6 +14,7 @@ package ptstorage import ( "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" @@ -45,13 +46,27 @@ import ( type storage struct { settings *cluster.Settings ex sqlutil.InternalExecutor + + knobs *protectedts.TestingKnobs } var _ protectedts.Storage = (*storage)(nil) +func useDeprecatedProtectedTSStorage( + ctx context.Context, st *cluster.Settings, knobs *protectedts.TestingKnobs, +) bool { + return !st.Version.IsActive(ctx, clusterversion.AlterSystemProtectedTimestampAddColumn) || + !knobs.EnableProtectedTimestampForMultiTenant +} + // New creates a new Storage. -func New(settings *cluster.Settings, ex sqlutil.InternalExecutor) protectedts.Storage { - return &storage{settings: settings, ex: ex} +func New( + settings *cluster.Settings, ex sqlutil.InternalExecutor, knobs *protectedts.TestingKnobs, +) protectedts.Storage { + if knobs == nil { + knobs = &protectedts.TestingKnobs{} + } + return &storage{settings: settings, ex: ex, knobs: knobs} } var errNoTxn = errors.New("must provide a non-nil transaction") @@ -71,17 +86,64 @@ func (p *storage) UpdateTimestamp( return nil } +func (p *storage) deprecatedProtect( + ctx context.Context, txn *kv.Txn, r *ptpb.Record, meta []byte, +) error { + s := makeSettings(p.settings) + encodedSpans, err := protoutil.Marshal(&Spans{Spans: r.DeprecatedSpans}) + if err != nil { // how can this possibly fail? + return errors.Wrap(err, "failed to marshal spans") + } + it, err := p.ex.QueryIteratorEx(ctx, "protectedts-deprecated-protect", txn, + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + protectQueryWithoutTarget, + s.maxSpans, s.maxBytes, len(r.DeprecatedSpans), + r.ID, r.Timestamp.AsOfSystemTime(), + r.MetaType, meta, + len(r.DeprecatedSpans), encodedSpans) + if err != nil { + return errors.Wrapf(err, "failed to write record %v", r.ID) + } + ok, err := it.Next(ctx) + if err != nil { + return errors.Wrapf(err, "failed to write record %v", r.ID) + } + if !ok { + return errors.Newf("failed to write record %v", r.ID) + } + row := it.Cur() + if err := it.Close(); err != nil { + log.Infof(ctx, "encountered %v when writing record %v", err, r.ID) + } + if failed := *row[0].(*tree.DBool); failed { + curNumSpans := int64(*row[1].(*tree.DInt)) + if s.maxSpans > 0 && curNumSpans+int64(len(r.DeprecatedSpans)) > s.maxSpans { + return errors.WithHint( + errors.Errorf("protectedts: limit exceeded: %d+%d > %d spans", curNumSpans, + len(r.DeprecatedSpans), s.maxSpans), + "SET CLUSTER SETTING kv.protectedts.max_spans to a higher value") + } + curBytes := int64(*row[2].(*tree.DInt)) + recordBytes := int64(len(encodedSpans) + len(r.Meta) + len(r.MetaType)) + if s.maxBytes > 0 && curBytes+recordBytes > s.maxBytes { + return errors.WithHint( + errors.Errorf("protectedts: limit exceeded: %d+%d > %d bytes", curBytes, recordBytes, + s.maxBytes), + "SET CLUSTER SETTING kv.protectedts.max_bytes to a higher value") + } + return protectedts.ErrExists + } + return nil +} + func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) error { - if err := validateRecordForProtect(r); err != nil { + if err := validateRecordForProtect(ctx, r, p.settings, p.knobs); err != nil { return err } if txn == nil { return errNoTxn } - encodedSpans, err := protoutil.Marshal(&Spans{Spans: r.DeprecatedSpans}) - if err != nil { // how can this possibly fail? - return errors.Wrap(err, "failed to marshal spans") - } + meta := r.Meta if meta == nil { // v20.1 crashes in rowToRecord and storage.Release if it finds a NULL @@ -91,14 +153,33 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro // TODO(nvanbenschoten): remove this for v21.1. meta = []byte{} } + + // The `target` column was added to `system.protected_ts_records` as part of + // the tenant migration `AlterSystemProtectedTimestampAddColumn`. Prior to the + // migration we should continue write records that protect `spans`. + // + // TODO(adityamaru): Delete in 22.2 once we exclusively protect `target`s. + if useDeprecatedProtectedTSStorage(ctx, p.settings, p.knobs) { + return p.deprecatedProtect(ctx, txn, r, meta) + } + + // Clear the `DeprecatedSpans` field even if it has been set by the caller. + // Once the `AlterSystemProtectedTimestampAddColumn` migration has run, we + // only want to persist the `target` on which the pts record applies. We have + // already verified that the record has a valid `target`. + r.DeprecatedSpans = nil s := makeSettings(p.settings) + encodedTarget, err := protoutil.Marshal(&ptpb.Target{Union: r.Target.GetUnion()}) + if err != nil { // how can this possibly fail? + return errors.Wrap(err, "failed to marshal spans") + } it, err := p.ex.QueryIteratorEx(ctx, "protectedts-protect", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, protectQuery, s.maxSpans, s.maxBytes, len(r.DeprecatedSpans), r.ID, r.Timestamp.AsOfSystemTime(), r.MetaType, meta, - len(r.DeprecatedSpans), encodedSpans) + len(r.DeprecatedSpans), encodedTarget, encodedTarget) if err != nil { return errors.Wrapf(err, "failed to write record %v", r.ID) } @@ -114,15 +195,8 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro log.Infof(ctx, "encountered %v when writing record %v", err, r.ID) } if failed := *row[0].(*tree.DBool); failed { - curNumSpans := int64(*row[1].(*tree.DInt)) - if s.maxSpans > 0 && curNumSpans+int64(len(r.DeprecatedSpans)) > s.maxSpans { - return errors.WithHint( - errors.Errorf("protectedts: limit exceeded: %d+%d > %d spans", curNumSpans, - len(r.DeprecatedSpans), s.maxSpans), - "SET CLUSTER SETTING kv.protectedts.max_spans to a higher value") - } - curBytes := int64(*row[2].(*tree.DInt)) - recordBytes := int64(len(encodedSpans) + len(r.Meta) + len(r.MetaType)) + curBytes := int64(*row[1].(*tree.DInt)) + recordBytes := int64(len(encodedTarget) + len(r.Meta) + len(r.MetaType)) if s.maxBytes > 0 && curBytes+recordBytes > s.maxBytes { return errors.WithHint( errors.Errorf("protectedts: limit exceeded: %d+%d > %d bytes", curBytes, recordBytes, @@ -131,13 +205,43 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro } return protectedts.ErrExists } + return nil } +func (p *storage) deprecatedGetRecord( + ctx context.Context, txn *kv.Txn, id uuid.UUID, +) (*ptpb.Record, error) { + row, err := p.ex.QueryRowEx(ctx, "protectedts-deprecated-GetRecord", txn, + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + getRecordWithoutTargetQuery, id.GetBytesMut()) + if err != nil { + return nil, errors.Wrapf(err, "failed to read record %v", id) + } + if len(row) == 0 { + return nil, protectedts.ErrNotExists + } + var r ptpb.Record + if err := rowToRecord(ctx, row, &r, p.settings, p.knobs); err != nil { + return nil, err + } + return &r, nil +} + func (p *storage) GetRecord(ctx context.Context, txn *kv.Txn, id uuid.UUID) (*ptpb.Record, error) { if txn == nil { return nil, errNoTxn } + + // The `target` column was added to `system.protected_ts_records` as part of + // the tenant migration `AlterSystemProtectedTimestampAddColumn`. Prior to the + // migration we should continue return records that protect `spans`. + // + // TODO(adityamaru): Delete in 22.2 once we exclusively protect `target`s. + if useDeprecatedProtectedTSStorage(ctx, p.settings, p.knobs) { + return p.deprecatedGetRecord(ctx, txn, id) + } + row, err := p.ex.QueryRowEx(ctx, "protectedts-GetRecord", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, getRecordQuery, id.GetBytesMut()) @@ -148,7 +252,7 @@ func (p *storage) GetRecord(ctx context.Context, txn *kv.Txn, id uuid.UUID) (*pt return nil, protectedts.ErrNotExists } var r ptpb.Record - if err := rowToRecord(ctx, row, &r); err != nil { + if err := rowToRecord(ctx, row, &r, p.settings, p.knobs); err != nil { return nil, err } return &r, nil @@ -225,18 +329,45 @@ func (p *storage) GetState(ctx context.Context, txn *kv.Txn) (ptpb.State, error) }, nil } +func (p *storage) deprecatedGetRecords(ctx context.Context, txn *kv.Txn) ([]ptpb.Record, error) { + it, err := p.ex.QueryIteratorEx(ctx, "protectedts-deprecated-GetRecords", txn, + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + getRecordsWithoutTargetQuery) + if err != nil { + return nil, errors.Wrap(err, "failed to read records") + } + + var ok bool + var records []ptpb.Record + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + var record ptpb.Record + if err := rowToRecord(ctx, it.Cur(), &record, p.settings, p.knobs); err != nil { + log.Errorf(ctx, "failed to parse row as record: %v", err) + } + records = append(records, record) + } + if err != nil { + return nil, errors.Wrap(err, "failed to read records") + } + return records, nil +} + func (p *storage) getRecords(ctx context.Context, txn *kv.Txn) ([]ptpb.Record, error) { + if useDeprecatedProtectedTSStorage(ctx, p.settings, p.knobs) { + return p.deprecatedGetRecords(ctx, txn) + } + it, err := p.ex.QueryIteratorEx(ctx, "protectedts-GetRecords", txn, - sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, - getRecordsQuery) + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, getRecordsQuery) if err != nil { return nil, errors.Wrap(err, "failed to read records") } + var ok bool var records []ptpb.Record for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { var record ptpb.Record - if err := rowToRecord(ctx, it.Cur(), &record); err != nil { + if err := rowToRecord(ctx, it.Cur(), &record, p.settings, p.knobs); err != nil { log.Errorf(ctx, "failed to parse row as record: %v", err) } records = append(records, record) @@ -252,7 +383,13 @@ func (p *storage) getRecords(ctx context.Context, txn *kv.Txn) ([]ptpb.Record, e // they are logged but not returned. Returning an error due to malformed data // in the protected timestamp subsystem would create more problems than it would // solve. Malformed records can still be removed (and hopefully will be). -func rowToRecord(ctx context.Context, row tree.Datums, r *ptpb.Record) error { +func rowToRecord( + ctx context.Context, + row tree.Datums, + r *ptpb.Record, + st *cluster.Settings, + knobs *protectedts.TestingKnobs, +) error { r.ID = row[0].(*tree.DUuid).UUID.GetBytes() tsDecimal := row[1].(*tree.DDecimal) ts, err := tree.DecimalToHLC(&tsDecimal.Decimal) @@ -269,10 +406,18 @@ func rowToRecord(ctx context.Context, row tree.Datums, r *ptpb.Record) error { } var spans Spans if err := protoutil.Unmarshal([]byte(*row[4].(*tree.DBytes)), &spans); err != nil { - return errors.Wrapf(err, "failed to unmarshal spans for %v", r.ID) + return errors.Wrapf(err, "failed to unmarshal span for %v", r.ID) } r.DeprecatedSpans = spans.Spans r.Verified = bool(*row[5].(*tree.DBool)) + + if !useDeprecatedProtectedTSStorage(ctx, st, knobs) { + target := &ptpb.Target{} + if err := protoutil.Unmarshal([]byte(*row[6].(*tree.DBytes)), target); err != nil { + return errors.Wrapf(err, "failed to unmarshal target for %v", r.ID) + } + r.Target = target + } return nil } @@ -292,18 +437,25 @@ var ( errZeroTimestamp = errors.New("invalid zero value timestamp") errZeroID = errors.New("invalid zero value ID") errEmptySpans = errors.Errorf("invalid empty set of spans") + errNilTarget = errors.Errorf("invalid nil target") errInvalidMeta = errors.Errorf("invalid Meta with empty MetaType") errCreateVerifiedRecord = errors.Errorf("cannot create a verified record") ) -func validateRecordForProtect(r *ptpb.Record) error { +func validateRecordForProtect( + ctx context.Context, r *ptpb.Record, st *cluster.Settings, knobs *protectedts.TestingKnobs, +) error { if r.Timestamp.IsEmpty() { return errZeroTimestamp } if r.ID.GetUUID() == uuid.Nil { return errZeroID } - if len(r.DeprecatedSpans) == 0 { + useDeprecatedPTSStorage := useDeprecatedProtectedTSStorage(ctx, st, knobs) + if !useDeprecatedPTSStorage && r.Target == nil { + return errNilTarget + } + if useDeprecatedPTSStorage && len(r.DeprecatedSpans) == 0 { return errEmptySpans } if len(r.Meta) > 0 && len(r.MetaType) == 0 { diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index e7e9a576359e..e1c8a1d7c0da 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -20,6 +20,7 @@ import ( "sort" "strconv" "testing" + "unsafe" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -32,10 +33,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -48,8 +52,15 @@ import ( ) func TestStorage(t *testing.T) { - for _, test := range testCases { - t.Run(test.name, test.run) + for _, withDeprecatedSpans := range []bool{true, false} { + for _, test := range testCases { + name := test.name + if withDeprecatedSpans { + name = fmt.Sprintf("%s_withDeprecatedSpans", name) + test.runWithDeprecatedSpans = true + } + t.Run(name, test.run) + } } } @@ -57,14 +68,17 @@ var testCases = []testCase{ { name: "Protect - simple positive", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{ + target: tableTarget(42), + spans: tableSpans(42), + }, }, }, { - name: "Protect - no spans", + name: "Protect - no targets", ops: []op{ protectOp{ - expErr: "invalid empty set of spans", + expErr: "invalid (nil target|empty set of spans)", }, }, }, @@ -72,7 +86,7 @@ var testCases = []testCase{ name: "Protect - zero timestamp", ops: []op{ funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { - rec := newRecord(hlc.Timestamp{}, "", nil, tableSpan(42)) + rec := newRecord(tCtx, hlc.Timestamp{}, "", nil, tableTarget(42), tableSpan(42)) err := tCtx.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return tCtx.pts.Protect(ctx, txn, &rec) }) @@ -84,7 +98,8 @@ var testCases = []testCase{ name: "Protect - already verified", ops: []op{ funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { - rec := newRecord(tCtx.tc.Server(0).Clock().Now(), "", nil, tableSpan(42)) + rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now(), "", nil, tableTarget(42), + tableSpan(42)) rec.Verified = true err := tCtx.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return tCtx.pts.Protect(ctx, txn, &rec) @@ -96,7 +111,7 @@ var testCases = []testCase{ { name: "Protect - already exists", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{spans: tableSpans(42), target: tableTarget(42)}, funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { // When max_bytes or max_spans is set to 0 (i.e. unlimited), and a // protect op fails because the record already exists, we should report @@ -108,7 +123,7 @@ var testCases = []testCase{ require.NoError(t, err) }), funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { - rec := newRecord(tCtx.tc.Server(0).Clock().Now(), "", nil, tableSpan(42)) + rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now(), "", nil, tableTarget(42), tableSpan(42)) rec.ID = pickOneRecord(tCtx).GetBytes() err := tCtx.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return tCtx.pts.Protect(ctx, txn, &rec) @@ -146,11 +161,12 @@ var testCases = []testCase{ expErr: "protectedts: limit exceeded: 3\\+1 > 3 spans", }, }, + runWithDeprecatedSpans: true, }, { name: "Protect - too many bytes", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{spans: tableSpans(42), target: tableTarget(42)}, funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { _, err := tCtx.tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.protectedts.max_bytes = $1", 1024) require.NoError(t, err) @@ -162,17 +178,19 @@ var testCases = []testCase{ s.EndKey = append(s.EndKey, bytes.Repeat([]byte{'a'}, 1024)...) return s }()), - expErr: "protectedts: limit exceeded: 8\\+1050 > 1024 bytes", + target: largeTableTarget(1024), + expErr: "protectedts: limit exceeded: .* bytes", }, protectOp{ - spans: tableSpans(1, 2), + spans: tableSpans(1, 2), + target: tableTargets(1, 2), }, }, }, { name: "Protect - unlimited bytes", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{spans: tableSpans(42), target: tableTarget(42)}, funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { _, err := tCtx.tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.protectedts.max_bytes = $1", 0) require.NoError(t, err) @@ -184,9 +202,11 @@ var testCases = []testCase{ s.EndKey = append(s.EndKey, bytes.Repeat([]byte{'a'}, 2<<20 /* 2 MiB */)...) return s }()), + target: largeTableTarget(2 << 20 /* 2 MiB */), }, protectOp{ - spans: tableSpans(1, 2), + spans: tableSpans(1, 2), + target: tableTargets(1, 2), }, }, }, @@ -212,6 +232,7 @@ var testCases = []testCase{ spans: tableSpans(1, 2), }, }, + runWithDeprecatedSpans: true, }, { name: "GetRecord - does not exist", @@ -230,7 +251,7 @@ var testCases = []testCase{ { name: "MarkVerified", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{target: tableTarget(42), spans: tableSpans(42)}, markVerifiedOp{idFunc: pickOneRecord}, markVerifiedOp{idFunc: pickOneRecord}, // it's idempotent markVerifiedOp{ @@ -242,7 +263,7 @@ var testCases = []testCase{ { name: "Release", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{target: tableTarget(42), spans: tableSpans(42)}, releaseOp{idFunc: pickOneRecord}, releaseOp{ idFunc: randomID, @@ -253,7 +274,7 @@ var testCases = []testCase{ { name: "UpdateTimestamp", ops: []op{ - protectOp{spans: tableSpans(42)}, + protectOp{spans: tableSpans(42), target: tableTarget(42)}, updateTimestampOp{ expectedRecordFn: func(record ptpb.Record) ptpb.Record { record.Timestamp = hlc.Timestamp{WallTime: 1} @@ -278,7 +299,7 @@ var testCases = []testCase{ name: "nil transaction errors", ops: []op{ funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { - rec := newRecord(tCtx.tc.Server(0).Clock().Now(), "", nil, tableSpan(42)) + rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now(), "", nil, tableTarget(42), tableSpan(42)) const msg = "must provide a non-nil transaction" require.Regexp(t, msg, tCtx.pts.Protect(ctx, nil /* txn */, &rec).Error()) require.Regexp(t, msg, tCtx.pts.Release(ctx, nil /* txn */, uuid.MakeV4()).Error()) @@ -299,6 +320,11 @@ type testContext struct { tc *testcluster.TestCluster db *kv.DB + // If set to false, the test will be run with + // `EnableProtectedTimestampForMultiTenant` set to true, thereby testing the + // "new" protected timestamp logic that runs on targets instead of spans. + runWithDeprecatedSpans bool + state ptpb.State } @@ -337,8 +363,14 @@ func (r releaseOp) run(ctx context.Context, t *testing.T, tCtx *testContext) { tCtx.state.Version++ tCtx.state.NumRecords-- tCtx.state.NumSpans -= uint64(len(rec.DeprecatedSpans)) - encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans}) - require.NoError(t, err) + var encoded []byte + if tCtx.runWithDeprecatedSpans { + encoded, err = protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans}) + require.NoError(t, err) + } else { + encoded, err = protoutil.Marshal(&ptpb.Target{Union: rec.Target.GetUnion()}) + require.NoError(t, err) + } tCtx.state.TotalBytes -= uint64(len(encoded) + len(rec.Meta) + len(rec.MetaType)) } } @@ -369,11 +401,12 @@ type protectOp struct { metaType string meta []byte spans []roachpb.Span + target *ptpb.Target expErr string } func (p protectOp) run(ctx context.Context, t *testing.T, tCtx *testContext) { - rec := newRecord(tCtx.tc.Server(0).Clock().Now(), p.metaType, p.meta, p.spans...) + rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now(), p.metaType, p.meta, p.target, p.spans...) if p.idFunc != nil { rec.ID = p.idFunc(tCtx).GetBytes() } @@ -393,8 +426,14 @@ func (p protectOp) run(ctx context.Context, t *testing.T, tCtx *testContext) { tCtx.state.Version++ tCtx.state.NumRecords++ tCtx.state.NumSpans += uint64(len(rec.DeprecatedSpans)) - encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: p.spans}) - require.NoError(t, err) + var encoded []byte + if tCtx.runWithDeprecatedSpans { + encoded, err = protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans}) + require.NoError(t, err) + } else { + encoded, err = protoutil.Marshal(&ptpb.Target{Union: rec.Target.GetUnion()}) + require.NoError(t, err) + } tCtx.state.TotalBytes += uint64(len(encoded) + len(p.meta) + len(p.metaType)) } } @@ -423,23 +462,31 @@ func (p updateTimestampOp) run(ctx context.Context, t *testing.T, tCtx *testCont } type testCase struct { - name string - ops []op + name string + ops []op + runWithDeprecatedSpans bool } func (test testCase) run(t *testing.T) { ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + var params base.TestServerArgs + + ptsKnobs := &protectedts.TestingKnobs{} + if !test.runWithDeprecatedSpans { + ptsKnobs.EnableProtectedTimestampForMultiTenant = true + params.Knobs.ProtectedTS = ptsKnobs + } + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) - pts := ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(*sql.InternalExecutor)) + pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), ptsKnobs) db := s.DB() tCtx := testContext{ - pts: pts, - db: db, - tc: tc, + pts: pts, + db: db, + tc: tc, + runWithDeprecatedSpans: test.runWithDeprecatedSpans, } verify := func(t *testing.T) { var state ptpb.State @@ -486,6 +533,28 @@ func pickOneRecord(tCtx *testContext) uuid.UUID { return tCtx.state.Records[rand.Intn(numRecords)].ID.GetUUID() } +func tableTargets(ids ...uint32) *ptpb.Target { + var tableIDs []descpb.ID + for _, id := range ids { + tableIDs = append(tableIDs, descpb.ID(id)) + } + return ptpb.MakeSchemaObjectsTarget(tableIDs) +} + +func tableTarget(tableID uint32) *ptpb.Target { + return ptpb.MakeSchemaObjectsTarget([]descpb.ID{descpb.ID(tableID)}) +} + +func largeTableTarget(targetBytesSize int64) *ptpb.Target { + var tableID descpb.ID + idSize := int64(unsafe.Sizeof(tableID)) + ids := make([]descpb.ID, 0) + for i := int64(0); i < targetBytesSize/idSize; i++ { + ids = append(ids, descpb.ID(rand.Uint32())) + } + return ptpb.MakeSchemaObjectsTarget(ids) +} + func tableSpan(tableID uint32) roachpb.Span { return roachpb.Span{ Key: keys.SystemSQLCodec.TablePrefix(tableID), @@ -501,7 +570,19 @@ func tableSpans(tableIDs ...uint32) []roachpb.Span { return spans } -func newRecord(ts hlc.Timestamp, metaType string, meta []byte, spans ...roachpb.Span) ptpb.Record { +func newRecord( + tCtx *testContext, + ts hlc.Timestamp, + metaType string, + meta []byte, + target *ptpb.Target, + spans ...roachpb.Span, +) ptpb.Record { + if tCtx.runWithDeprecatedSpans { + target = nil + } else { + spans = nil + } return ptpb.Record{ ID: uuid.MakeV4().GetBytes(), Timestamp: ts, @@ -509,6 +590,7 @@ func newRecord(ts hlc.Timestamp, metaType string, meta []byte, spans ...roachpb. MetaType: metaType, Meta: meta, DeprecatedSpans: spans, + Target: target, } } @@ -531,33 +613,27 @@ func newRecord(ts hlc.Timestamp, metaType string, meta []byte, spans ...roachpb. func TestCorruptData(t *testing.T) { ctx := context.Background() - t.Run("corrupt spans", func(t *testing.T) { - // Set the log scope so we can introspect the logged errors. - scope := log.Scope(t) - defer scope.Close(t) - - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) - defer tc.Stopper().Stop(ctx) - - s := tc.Server(0) - pts := ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(*sql.InternalExecutor)) - - rec := newRecord(s.Clock().Now(), "foo", []byte("bar"), tableSpan(42)) + runCorruptDataTest := func(tCtx *testContext, s serverutils.TestServerInterface, + tc *testcluster.TestCluster, pts protectedts.Storage) { + rec := newRecord(tCtx, s.Clock().Now(), "foo", []byte("bar"), tableTarget(42), tableSpan(42)) require.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return pts.Protect(ctx, txn, &rec) })) ie := tc.Server(0).InternalExecutor().(sqlutil.InternalExecutor) + updateQuery := "UPDATE system.protected_ts_records SET target = $1 WHERE id = $2" + if tCtx.runWithDeprecatedSpans { + updateQuery = "UPDATE system.protected_ts_records SET spans = $1 WHERE id = $2" + } affected, err := ie.ExecEx( ctx, "corrupt-data", nil, /* txn */ sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, - "UPDATE system.protected_ts_records SET spans = $1 WHERE id = $2", + updateQuery, []byte("junk"), rec.ID.String()) require.NoError(t, err) require.Equal(t, 1, affected) var got *ptpb.Record - msg := regexp.MustCompile("failed to unmarshal spans for " + rec.ID.String() + ": ") + msg := regexp.MustCompile("failed to unmarshal (span|target) for " + rec.ID.String() + ": ") require.Regexp(t, msg, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { got, err = pts.GetRecord(ctx, txn, rec.ID.GetUUID()) @@ -576,20 +652,55 @@ func TestCorruptData(t *testing.T) { for _, e := range entries { require.Equal(t, severity.ERROR, e.Severity) } + } + + // TODO(adityamaru): Remove test when we delete `spans` field from + // record. + t.Run("corrupt spans", func(t *testing.T) { + // Set the log scope so we can introspect the logged errors. + scope := log.Scope(t) + defer scope.Close(t) + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + s := tc.Server(0) + pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), + nil /* knobs */) + + tCtx := &testContext{runWithDeprecatedSpans: true} + runCorruptDataTest(tCtx, s, tc, pts) + }) + t.Run("corrupt target", func(t *testing.T) { + // Set the log scope so we can introspect the logged errors. + scope := log.Scope(t) + defer scope.Close(t) + + params, _ := tests.CreateTestServerParams() + ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true} + params.Knobs.ProtectedTS = ptsKnobs + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) + defer tc.Stopper().Stop(ctx) + + s := tc.Server(0) + pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), ptsKnobs) + runCorruptDataTest(&testContext{}, s, tc, pts) }) t.Run("corrupt hlc timestamp", func(t *testing.T) { // Set the log scope so we can introspect the logged errors. scope := log.Scope(t) defer scope.Close(t) - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + params, _ := tests.CreateTestServerParams() + ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true} + params.Knobs.ProtectedTS = ptsKnobs + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) - pts := ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(*sql.InternalExecutor)) + pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), ptsKnobs) - rec := newRecord(s.Clock().Now(), "foo", []byte("bar"), tableSpan(42)) + rec := newRecord(&testContext{}, s.Clock().Now(), "foo", []byte("bar"), tableTarget(42), tableSpan(42)) require.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return pts.Protect(ctx, txn, &rec) })) @@ -635,18 +746,22 @@ func TestCorruptData(t *testing.T) { // are properly transmitted back to the client. func TestErrorsFromSQL(t *testing.T) { ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + params, _ := tests.CreateTestServerParams() + ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true} + params.Knobs.ProtectedTS = ptsKnobs + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) ie := s.InternalExecutor().(sqlutil.InternalExecutor) wrappedIE := &wrappedInternalExecutor{wrapped: ie} - pts := ptstorage.New(s.ClusterSettings(), wrappedIE) + pts := ptstorage.New(s.ClusterSettings(), wrappedIE, ptsKnobs) wrappedIE.setErrFunc(func(string) error { return errors.New("boom") }) - rec := newRecord(s.Clock().Now(), "foo", []byte("bar"), tableSpan(42)) + rec := newRecord(&testContext{}, s.Clock().Now(), "foo", []byte("bar"), tableTarget(42), tableSpan(42)) require.EqualError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return pts.Protect(ctx, txn, &rec) }), fmt.Sprintf("failed to write record %v: boom", rec.ID)) diff --git a/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go b/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go index 99a5a296d003..97f20918969d 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go @@ -11,61 +11,58 @@ package ptstorage import ( + "context" "strconv" "testing" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" - roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) func TestValidateRecordForProtect(t *testing.T) { - spans := []roachpb.Span{ - { - Key: roachpb.Key("a"), - EndKey: roachpb.Key("b"), - }, - } + target := ptpb.MakeClusterTarget() for i, tc := range []struct { r *ptpb.Record err error }{ { r: &ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, - MetaType: "job", - Meta: []byte("junk"), - DeprecatedSpans: spans, + ID: uuid.MakeV4().GetBytes(), + Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, + MetaType: "job", + Meta: []byte("junk"), + Target: target, }, err: nil, }, { r: &ptpb.Record{ - Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, - MetaType: "job", - Meta: []byte("junk"), - DeprecatedSpans: spans, + Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, + MetaType: "job", + Meta: []byte("junk"), + Target: target, }, err: errZeroID, }, { r: &ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - MetaType: "job", - Meta: []byte("junk"), - DeprecatedSpans: spans, + ID: uuid.MakeV4().GetBytes(), + MetaType: "job", + Meta: []byte("junk"), + Target: target, }, err: errZeroTimestamp, }, { r: &ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, - Meta: []byte("junk"), - DeprecatedSpans: spans, + ID: uuid.MakeV4().GetBytes(), + Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, + Meta: []byte("junk"), + Target: target, }, err: errInvalidMeta, }, @@ -76,11 +73,28 @@ func TestValidateRecordForProtect(t *testing.T) { MetaType: "job", Meta: []byte("junk"), }, - err: errEmptySpans, + err: errNilTarget, }, } { t.Run(strconv.Itoa(i), func(t *testing.T) { - require.Equal(t, validateRecordForProtect(tc.r), tc.err) + st := cluster.MakeTestingClusterSettings() + require.Equal(t, validateRecordForProtect(context.Background(), tc.r, st, + &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true}), tc.err) + }) + + // Test that prior to the `AlterSystemProtectedTimestampAddColumn` migration + // we validate that records have a non-nil `Spans` field. + t.Run("errEmptySpans", func(t *testing.T) { + r := &ptpb.Record{ + ID: uuid.MakeV4().GetBytes(), + Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, + MetaType: "job", + Meta: []byte("junk"), + Target: target, + } + st := cluster.MakeTestingClusterSettings() + require.Equal(t, validateRecordForProtect(context.Background(), r, st, + &protectedts.TestingKnobs{}), errEmptySpans) }) } } diff --git a/pkg/kv/kvserver/protectedts/ptverifier/verifier.go b/pkg/kv/kvserver/protectedts/ptverifier/verifier.go index 26f7d5af6679..c3f46349c80d 100644 --- a/pkg/kv/kvserver/protectedts/ptverifier/verifier.go +++ b/pkg/kv/kvserver/protectedts/ptverifier/verifier.go @@ -45,6 +45,13 @@ func (v *verifier) Verify(ctx context.Context, id uuid.UUID) error { return errors.Wrapf(err, "failed to fetch record %s", id) } + // TODO(adityamaru): Remove this once we delete all `Verify` calls. The new + // subsystem is not going to provide verification semantics. Until then mark + // the record as verified so it is a noop. + if r.DeprecatedSpans == nil { + return nil + } + if r.Verified { // already verified return nil } diff --git a/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go b/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go index 311abbe1032a..de87dfbd5339 100644 --- a/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go +++ b/pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go @@ -63,7 +63,7 @@ func TestVerifier(t *testing.T) { }), ) - pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor)) + pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */) withDB := ptstorage.WithDatabase(pts, s.DB()) db := kv.NewDB(s.DB().AmbientContext, tsf, s.Clock(), s.Stopper()) ptv := ptverifier.New(db, pts) diff --git a/pkg/kv/kvserver/protectedts/testing_knobs.go b/pkg/kv/kvserver/protectedts/testing_knobs.go new file mode 100644 index 000000000000..5a57770f930b --- /dev/null +++ b/pkg/kv/kvserver/protectedts/testing_knobs.go @@ -0,0 +1,29 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package protectedts + +import "github.com/cockroachdb/cockroach/pkg/base" + +// TestingKnobs provide fine-grained control over the various span config +// components for testing. +type TestingKnobs struct { + // EnableProtectedTimestampForMultiTenant when set to true, runs the protected + // timestamp subsystem that depends on span configuration reconciliation. + // + // TODO(adityamaru,arulajmani): Default this to true, or flip it to + // `DisableProtectedTimestampForMultiTenant` prior to release 22.1. + EnableProtectedTimestampForMultiTenant bool +} + +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (t *TestingKnobs) ModuleTestingKnobs() {} + +var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil) diff --git a/pkg/server/server.go b/pkg/server/server.go index 66da809a3230..afaf015edc87 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -580,10 +580,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { user security.SQLUsername) (cloud.ExternalStorage, error) { return externalStorageBuilder.makeExternalStorageFromURI(ctx, uri, user) } + + protectedtsKnobs, _ := cfg.TestingKnobs.ProtectedTS.(*protectedts.TestingKnobs) protectedtsProvider, err := ptprovider.New(ptprovider.Config{ DB: db, InternalExecutor: internalExecutor, Settings: st, + Knobs: protectedtsKnobs, }) if err != nil { return nil, err diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 06cc4b4446b2..c74b18a3913b 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -450,10 +450,12 @@ func makeTenantSQLServerArgs( // clusters. var protectedTSProvider protectedts.Provider { + protectedtsKnobs, _ := baseCfg.TestingKnobs.ProtectedTS.(*protectedts.TestingKnobs) pp, err := ptprovider.New(ptprovider.Config{ DB: db, InternalExecutor: circularInternalExecutor, Settings: st, + Knobs: protectedtsKnobs, }) if err != nil { panic(err)