diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index 1ed0d478d892..94428bb2279d 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -93,38 +93,16 @@ func writeMutation( } } -func writeGCMutation( - t *testing.T, - kvDB *kv.DB, - tableDesc *tabledesc.Mutable, - m descpb.TableDescriptor_GCDescriptorMutation, -) { - tableDesc.GCMutations = append(tableDesc.GCMutations, m) - tableDesc.Version++ - if err := catalog.ValidateSelf(tableDesc); err != nil { - t.Fatal(err) - } - if err := kvDB.Put( - context.Background(), - catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.GetID()), - tableDesc.DescriptorProto(), - ); err != nil { - t.Fatal(err) - } -} - type mutationOptions struct { // Set if the desc should have any mutations of any sort. hasMutation bool - // Set if the mutation being inserted is a GCMutation. - hasGCMutation bool // Set if the desc should have a job that is dropping it. hasDropJob bool } func (m mutationOptions) string() string { - return fmt.Sprintf("hasMutation=%s_hasGCMutation=%s_hasDropJob=%s", - strconv.FormatBool(m.hasMutation), strconv.FormatBool(m.hasGCMutation), + return fmt.Sprintf("hasMutation=%s_hasDropJob=%s", + strconv.FormatBool(m.hasMutation), strconv.FormatBool(m.hasDropJob)) } @@ -172,7 +150,7 @@ func TestRegistryGC(t *testing.T) { writeJob := func(name string, created, finished time.Time, status Status, mutOptions mutationOptions) string { tableName := constructTableName(name, mutOptions) if _, err := sqlDB.Exec(fmt.Sprintf(` -CREATE DATABASE IF NOT EXISTS t; +CREATE DATABASE IF NOT EXISTS t; CREATE TABLE t."%s" (k VARCHAR PRIMARY KEY DEFAULT 'default', v VARCHAR,i VARCHAR NOT NULL DEFAULT 'i'); INSERT INTO t."%s" VALUES('a', 'foo'); `, tableName, tableName)); err != nil { @@ -187,10 +165,6 @@ INSERT INTO t."%s" VALUES('a', 'foo'); writeColumnMutation(t, kvDB, tableDesc, "i", descpb.DescriptorMutation{State: descpb. DescriptorMutation_DELETE_AND_WRITE_ONLY, Direction: descpb.DescriptorMutation_DROP}) } - if mutOptions.hasGCMutation { - writeGCMutation(t, kvDB, tableDesc, descpb.TableDescriptor_GCDescriptorMutation{}) - } - payload, err := protoutil.Marshal(&jobspb.Payload{ Description: name, // register a mutation on the table so that jobs that reference @@ -222,58 +196,54 @@ INSERT INTO t."%s" VALUES('a', 'foo'); // Test the descriptor when any of the following are set. // 1. Mutations - // 2. GC Mutations - // 3. A drop job + // 2. A drop job for _, hasMutation := range []bool{true, false} { - for _, hasGCMutation := range []bool{true, false} { - for _, hasDropJob := range []bool{true, false} { - if !hasMutation && !hasGCMutation && !hasDropJob { - continue - } - mutOptions := mutationOptions{ - hasMutation: hasMutation, - hasGCMutation: hasGCMutation, - hasDropJob: hasDropJob, - } - oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning, mutOptions) - oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded, mutOptions) - oldFailedJob := writeJob("old_failed", muchEarlier, muchEarlier.Add(time.Minute), - StatusFailed, mutOptions) - oldRevertFailedJob := writeJob("old_revert_failed", muchEarlier, muchEarlier.Add(time.Minute), - StatusRevertFailed, mutOptions) - oldCanceledJob := writeJob("old_canceled", muchEarlier, muchEarlier.Add(time.Minute), - StatusCanceled, mutOptions) - newRunningJob := writeJob("new_running", earlier, earlier.Add(time.Minute), StatusRunning, - mutOptions) - newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded, mutOptions) - newFailedJob := writeJob("new_failed", earlier, earlier.Add(time.Minute), StatusFailed, mutOptions) - newRevertFailedJob := writeJob("new_revert_failed", earlier, earlier.Add(time.Minute), StatusRevertFailed, mutOptions) - newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute), - StatusCanceled, mutOptions) - - db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ - {oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob}, - {newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) - - if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil { - t.Fatal(err) - } - db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ - {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob}, - {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) + for _, hasDropJob := range []bool{true, false} { + if !hasMutation && !hasDropJob { + continue + } + mutOptions := mutationOptions{ + hasMutation: hasMutation, + hasDropJob: hasDropJob, + } + oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning, mutOptions) + oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded, mutOptions) + oldFailedJob := writeJob("old_failed", muchEarlier, muchEarlier.Add(time.Minute), + StatusFailed, mutOptions) + oldRevertFailedJob := writeJob("old_revert_failed", muchEarlier, muchEarlier.Add(time.Minute), + StatusRevertFailed, mutOptions) + oldCanceledJob := writeJob("old_canceled", muchEarlier, muchEarlier.Add(time.Minute), + StatusCanceled, mutOptions) + newRunningJob := writeJob("new_running", earlier, earlier.Add(time.Minute), StatusRunning, + mutOptions) + newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded, mutOptions) + newFailedJob := writeJob("new_failed", earlier, earlier.Add(time.Minute), StatusFailed, mutOptions) + newRevertFailedJob := writeJob("new_revert_failed", earlier, earlier.Add(time.Minute), StatusRevertFailed, mutOptions) + newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute), + StatusCanceled, mutOptions) + + db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ + {oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob}, + {newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) + + if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil { + t.Fatal(err) + } + db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ + {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob}, + {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) - if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil { - t.Fatal(err) - } - db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ - {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}}) - - // Delete the revert failed, and running jobs for the next run of the - // test. - _, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1 OR id = $2 OR id = $3 OR id = $4`, - oldRevertFailedJob, newRevertFailedJob, oldRunningJob, newRunningJob) - require.NoError(t, err) + if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil { + t.Fatal(err) } + db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ + {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}}) + + // Delete the revert failed, and running jobs for the next run of the + // test. + _, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1 OR id = $2 OR id = $3 OR id = $4`, + oldRevertFailedJob, newRevertFailedJob, oldRunningJob, newRunningJob) + require.NoError(t, err) } } } diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 04316b9bb7d8..295ae01f942e 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -230,6 +230,7 @@ go_test( "client_split_burst_test.go", "client_split_test.go", "client_status_test.go", + "client_store_test.go", "client_tenant_test.go", "client_test.go", "closed_timestamp_test.go", diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 1463fb6713b4..a02d026924c2 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -1011,11 +1011,11 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) { ctx := context.Background() serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + // This test was written with the SystemConfigSpan in mind. + DisableSpanConfigs: true, Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ DisableMergeQueue: true, - // This test was written with the SystemConfigSpan in mind. - UseSystemConfigSpanForQueues: true, }, }, }) @@ -1082,11 +1082,11 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) { ctx := context.Background() serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + // This test was written with the system config span in mind. + DisableSpanConfigs: true, Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ DisableMergeQueue: true, - // This test was written with the system config span in mind. - UseSystemConfigSpanForQueues: true, }, }, }) @@ -3562,12 +3562,11 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { ctx := context.Background() serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + DisableSpanConfigs: true, Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - DisableMergeQueue: true, - // This test was written with the system config span in mind. - UseSystemConfigSpanForQueues: true, - TestingResponseFilter: respFilter, + DisableMergeQueue: true, + TestingResponseFilter: respFilter, }, }, }) diff --git a/pkg/kv/kvserver/client_store_test.go b/pkg/kv/kvserver/client_store_test.go new file mode 100644 index 000000000000..f47006c66943 --- /dev/null +++ b/pkg/kv/kvserver/client_store_test.go @@ -0,0 +1,72 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +// TestStoreSetRangesMaxBytes creates a set of ranges via splitting and then +// sets the config zone to a custom max bytes value to verify the ranges' max +// bytes are updated appropriately. +func TestStoreSetRangesMaxBytes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const expMaxBytes, defaultMaxBytes = 420 << 20, 512 << 20 + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, + base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ConfigureScratchRange: true, + }, + }, + }, + }, + ) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) + tdb := sqlutils.MakeSQLRunner(tc.Conns[0]) + + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) // speeds up the test + + testKey := tc.ScratchRange(t) + testutils.SucceedsSoon(t, func() error { + repl := store.LookupReplica(roachpb.RKey(testKey)) + if got := repl.GetMaxBytes(); got != defaultMaxBytes { + return errors.Errorf("range max bytes values did not start at %d; got %d", defaultMaxBytes, got) + } + return nil + }) + + tdb.Exec(t, `ALTER RANGE DEFAULT CONFIGURE ZONE USING range_max_bytes = $1`, expMaxBytes) + + testutils.SucceedsSoon(t, func() error { + repl := store.LookupReplica(roachpb.RKey(testKey)) + if got := repl.GetMaxBytes(); got != expMaxBytes { + return errors.Errorf("range max bytes values did not change to %d; got %d", expMaxBytes, got) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 942f1b28f475..a8af1d8071fa 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -245,59 +245,62 @@ func TestReplicateQueueDownReplicate(t *testing.T) { skip.UnderRace(t, "takes >1min under race") ctx := context.Background() - const replicaCount = 3 - - // The goal of this test is to ensure that down replication occurs correctly - // using the replicate queue, and to ensure that's the case, the test - // cluster needs to be kept in auto replication mode. - tc := testcluster.StartTestCluster(t, replicaCount+2, + // The goal of this test is to ensure that down replication occurs + // correctly using the replicate queue, and to ensure that's the case, + // the test cluster needs to be kept in auto replication mode. + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ ReplicationMode: base.ReplicationAuto, ServerArgs: base.TestServerArgs{ ScanMinIdleTime: 10 * time.Millisecond, ScanMaxIdleTime: 10 * time.Millisecond, + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ConfigureScratchRange: true, + }, + }, }, }, ) defer tc.Stopper().Stop(ctx) - // Disable the replication queues so that the range we're about to create - // doesn't get down-replicated too soon. - tc.ToggleReplicateQueues(false) - testKey := tc.ScratchRange(t) - desc := tc.LookupRangeOrFatal(t, testKey) - // At the end of StartTestCluster(), all ranges have 5 replicas since they're - // all "system ranges". When the ScratchRange() splits its range, it also - // starts up with 5 replicas. Since it's not a system range, its default zone - // config asks for 3x replication, and the replication queue will - // down-replicate it. - require.Len(t, desc.Replicas().Descriptors(), 5) - // Re-enable the replication queue. - tc.ToggleReplicateQueues(true) + testutils.SucceedsSoon(t, func() error { + desc := tc.LookupRangeOrFatal(t, testKey) + if got := len(desc.Replicas().Descriptors()); got != 3 { + return errors.Newf("expected 3 replicas for scratch range, found %d", got) + } + return nil + }) + + _, err := tc.ServerConn(0).Exec( + `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 1`, + ) + require.NoError(t, err) + + for _, s := range tc.Servers { + require.NoError(t, s.Stores().VisitStores(func(s *kvserver.Store) error { + require.NoError(t, s.ForceReplicationScanAndProcess()) + return nil + })) + } // Now wait until the replicas have been down-replicated back to the // desired number. testutils.SucceedsSoon(t, func() error { - descriptor, err := tc.LookupRange(testKey) - if err != nil { - t.Fatal(err) - } - if len(descriptor.InternalReplicas) != replicaCount { - return errors.Errorf("replica count, want %d, current %d", replicaCount, len(desc.InternalReplicas)) + desc := tc.LookupRangeOrFatal(t, testKey) + if got := len(desc.Replicas().Descriptors()); got != 1 { + return errors.Errorf("expected 1 replica, found %d", got) } return nil }) + desc := tc.LookupRangeOrFatal(t, testKey) infos, err := filterRangeLog( tc.Conns[0], desc.RangeID, kvserverpb.RangeLogEventType_remove_voter, kvserverpb.ReasonRangeOverReplicated, ) - if err != nil { - t.Fatal(err) - } - if len(infos) < 1 { - t.Fatalf("found no downreplication due to over-replication in the range logs") - } + require.NoError(t, err) + require.Truef(t, len(infos) >= 1, "found no down replication due to over-replication in the range logs") } func scanAndGetNumNonVoters( diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 190ea155c6f4..8e84576a13e8 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1950,9 +1950,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { s.onSpanConfigUpdate(ctx, update) }) - // When toggling between the system config span and the span configs - // infrastructure, we want to re-apply configs on all replicas from - // whatever the new source is. + // When toggling between the system config span and the span + // configs infrastructure, we want to re-apply configs on all + // replicas from whatever the new source is. spanconfigstore.EnabledSetting.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) { enabled := spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) if enabled { @@ -2281,6 +2281,10 @@ func (s *Store) removeReplicaWithRangefeed(rangeID roachpb.RangeID) { // systemGossipUpdate is a callback for gossip updates to // the system config which affect range split boundaries. func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { + if !s.cfg.SpanConfigsDisabled && spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { + return // nothing to do + } + ctx := s.AnnotateCtx(context.Background()) s.computeInitialMetrics.Do(func() { // Metrics depend in part on the system config. Compute them as soon as we diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 2760897ea23e..5fa0b0d719ff 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -57,7 +57,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" - "github.com/gogo/protobuf/proto" "github.com/kr/pretty" "github.com/stretchr/testify/require" raft "go.etcd.io/etcd/raft/v3" @@ -299,7 +298,7 @@ func TestIterateIDPrefixKeys(t *testing.T) { stopper.AddCloser(eng) seed := randutil.NewPseudoSeed() - //const seed = -1666367124291055473 + // const seed = -1666367124291055473 t.Logf("seed is %d", seed) rng := rand.New(rand.NewSource(seed)) @@ -1277,69 +1276,6 @@ func TestStoreReplicasByKey(t *testing.T) { } } -// TestStoreSetRangesMaxBytes creates a set of ranges via splitting -// and then sets the config zone to a custom max bytes value to -// verify the ranges' max bytes are updated appropriately. -func TestStoreSetRangesMaxBytes(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - cfg := TestStoreConfig(nil) - cfg.TestingKnobs.DisableMergeQueue = true - store := createTestStoreWithConfig(ctx, t, stopper, - testStoreOpts{ - // This test was written before test stores could start with more than one - // range and was not adapted. - createSystemRanges: false, - }, - &cfg) - - baseID := keys.TestingUserDescID(0) - testData := []struct { - repl *Replica - expMaxBytes int64 - }{ - {store.LookupReplica(roachpb.RKeyMin), - store.cfg.DefaultSpanConfig.RangeMaxBytes}, - {splitTestRange(store, roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID)), t), - 1 << 20}, - {splitTestRange(store, roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID+1)), t), - store.cfg.DefaultSpanConfig.RangeMaxBytes}, - {splitTestRange(store, roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID+2)), t), - 2 << 20}, - } - - // Set zone configs. - zoneA := zonepb.DefaultZoneConfig() - zoneA.RangeMaxBytes = proto.Int64(1 << 20) - config.TestingSetZoneConfig(config.SystemTenantObjectID(baseID), zoneA) - - zoneB := zonepb.DefaultZoneConfig() - zoneB.RangeMaxBytes = proto.Int64(2 << 20) - config.TestingSetZoneConfig(config.SystemTenantObjectID(baseID+2), zoneB) - - // Despite faking the zone configs, we still need to have a system config - // entry so that the store picks up the new zone configs. This new system - // config needs to be non-empty so that it differs from the initial value - // which triggers the system config callback to be run. - sysCfg := &config.SystemConfigEntries{} - sysCfg.Values = []roachpb.KeyValue{{Key: roachpb.Key("a")}} - if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, sysCfg, 0); err != nil { - t.Fatal(err) - } - - testutils.SucceedsSoon(t, func() error { - for _, test := range testData { - if mb := test.repl.GetMaxBytes(); mb != test.expMaxBytes { - return errors.Errorf("range max bytes values did not change to %d; got %d", test.expMaxBytes, mb) - } - } - return nil - }) -} - // TestStoreResolveWriteIntent adds a write intent and then verifies // that a put returns success and aborts intent's txn in the event the // pushee has lower priority. Otherwise, verifies that the put blocks diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 9c693b93f637..4ada9924f84e 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -374,6 +374,9 @@ type StoreTestingKnobs struct { // UseSystemConfigSpanForQueues uses the system config span infrastructure // for internal queues (as opposed to the span configs infrastructure). This // is used only for (old) tests written with the system config span in mind. + // + // TODO(irfansharif): Get rid of this knob, maybe by first moving + // DisableSpanConfigs into a testing knob instead of a server arg. UseSystemConfigSpanForQueues bool } diff --git a/pkg/roachpb/span_config.proto b/pkg/roachpb/span_config.proto index 762a01887fae..ca0277bba56c 100644 --- a/pkg/roachpb/span_config.proto +++ b/pkg/roachpb/span_config.proto @@ -14,9 +14,7 @@ option go_package = "roachpb"; import "roachpb/data.proto"; import "gogoproto/gogo.proto"; - -// TODO(irfansharif): We could have the proto definitions in pkg/config/zonepb -// use these messages instead of duplicating everything. +import "util/hlc/timestamp.proto"; // GCPolicy dictates the garbage collection policy to apply over a given span. // It parallels the definition found in zonepb/zone.proto. @@ -28,6 +26,24 @@ message GCPolicy { // before garbage collection. A value <= 0 means older versions are never // GC-ed. int32 ttl_seconds = 1 [(gogoproto.customname) = "TTLSeconds"]; + + // ProtectionPolicies is a list of policies that dictate GC behavior for a + // range (in conjunction with the GC TTL). A ProtectionPolicy can be used + // to indicate a timestamp above which GC should not run, regardless of the + // GC TTL. The data it applies over is guaranteed to not be GC-ed provided it + // wasn't GC-ed before the config applied. + repeated ProtectionPolicy protection_policies = 2 [(gogoproto.nullable) = false]; +} + +// ProtectionPolicy dictates a protection policy against garbage collection that +// applies over a given span. +message ProtectionPolicy { + option (gogoproto.equal) = true; + option (gogoproto.populate) = true; + + // ProtectedTimestamp is a timestamp above which GC should not run, regardless + // of the GC TTL. + util.hlc.Timestamp protected_timestamp = 1 [(gogoproto.nullable) = false]; } // Constraint constrains the stores that a replica can be stored on. It @@ -85,8 +101,8 @@ message LeasePreference { repeated Constraint constraints = 1 [(gogoproto.nullable) = false]; } -// SpanConfig holds the configuration that applies to a given keyspan. It -// parallels the definition found in zonepb/zone.proto. +// SpanConfig holds the configuration that applies to a given keyspan. It is a +// superset of the fields found in zonepb.zone.proto. message SpanConfig { option (gogoproto.equal) = true; @@ -151,6 +167,39 @@ message SpanConfigEntry { SpanConfig config = 2 [(gogoproto.nullable) = false]; }; +// SystemSpanConfig is a system installed configuration that may apply to +// multiple spans. +message SystemSpanConfig { + option (gogoproto.equal) = true; + + // ProtectionPolicies is a list of policies which protect data from being + // GC-ed. + repeated ProtectionPolicy protection_policies = 1 [(gogoproto.nullable) = false]; +} + +// SystemSpanConfigTarget is used to specify the target of a SystemSpanConfig. +message SystemSpanConfigTarget { + // TenantID indicates the tenant ID of the logical cluster being targeted. + // For secondary tenants this field is left unset. For the host we can use + // this field to protect a specific secondary tenant. + // + // TODO(arul): Ensure that secondary tenants don't populate this field when + // we make use of these in the RPC. + roachpb.TenantID tenant_id = 1 [(gogoproto.customname) = "TenantID", (gogoproto.nullable) = true]; +} + + +// SystemSpanConfigEntry is a SystemSpanConfigTarget and its corresponding +// SystemSpanConfig. +message SystemSpanConfigEntry { + // SystemSpanConfigTarget represents the target over which the config is said + // to apply. + SystemSpanConfigTarget system_span_config_target = 1 [(gogoproto.nullable) = false]; + + // SystemSpanConfig is the config that applies. + SystemSpanConfig system_span_config = 2 [(gogoproto.nullable) = false]; +} + // GetSpanConfigsRequest is used to fetch the span configurations over the // specified keyspans. message GetSpanConfigsRequest { diff --git a/pkg/sql/catalog/descpb/structured.proto b/pkg/sql/catalog/descpb/structured.proto index 29f7deb8bd81..491835e2c6df 100644 --- a/pkg/sql/catalog/descpb/structured.proto +++ b/pkg/sql/catalog/descpb/structured.proto @@ -1108,6 +1108,7 @@ message TableDescriptor { (gogoproto.customname) = "JobID", deprecated = true]; } + // Before 22.1: // The schema elements that have been dropped and whose underlying // data needs to be gc-ed. These schema elements have already transitioned // through the drop state machine when they were in the above mutations @@ -1115,8 +1116,12 @@ message TableDescriptor { // can be reused. This list is separate because mutations can // lie in this list for a long time (gc deadline) and should not block // the execution of other schema changes on the table. + // + // Since 22.1 this is field is deprecated and no longer maintained. + // The index GC job still removes mutations it finds in this list. repeated GCDescriptorMutation gc_mutations = 33 [(gogoproto.nullable) = false, - (gogoproto.customname) = "GCMutations"]; + (gogoproto.customname) = "GCMutations", + deprecated = true]; optional string create_query = 34 [(gogoproto.nullable) = false]; diff --git a/pkg/sql/catalog/descriptor.go b/pkg/sql/catalog/descriptor.go index 2f7943667a47..0f8ac459968e 100644 --- a/pkg/sql/catalog/descriptor.go +++ b/pkg/sql/catalog/descriptor.go @@ -533,8 +533,6 @@ type TableDescriptor interface { MakePublic() TableDescriptor // AllMutations returns all of the table descriptor's mutations. AllMutations() []Mutation - // GetGCMutations returns the table descriptor's GC mutations. - GetGCMutations() []descpb.TableDescriptor_GCDescriptorMutation // GetMutationJobs returns the table descriptor's mutation jobs. GetMutationJobs() []descpb.TableDescriptor_MutationJob diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index c33098b8483b..07a972323db4 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -1118,14 +1118,9 @@ INSERT INTO t.kv VALUES ('a', 'b'); } testutils.SucceedsSoon(t, func() error { - if tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "kv"); len(tableDesc.GetGCMutations()) != 0 { - return errors.Errorf("%d gc mutations remaining", len(tableDesc.GetGCMutations())) - } - return nil + return tests.CheckKeyCountE(t, kvDB, tableSpan, 2) }) - tests.CheckKeyCount(t, kvDB, tableSpan, 2) - // TODO(erik, vivek): Transactions using old descriptors should fail and // rollback when the index keys have been removed by ClearRange // and the consistency issue is resolved. See #31563. diff --git a/pkg/sql/catalog/tabledesc/structured.go b/pkg/sql/catalog/tabledesc/structured.go index 9fa4ff77453d..ba17203ec493 100644 --- a/pkg/sql/catalog/tabledesc/structured.go +++ b/pkg/sql/catalog/tabledesc/structured.go @@ -63,11 +63,6 @@ var ErrMissingColumns = errors.New("table must contain at least 1 column") // ErrMissingPrimaryKey indicates a table with no primary key. var ErrMissingPrimaryKey = errors.New("table must contain a primary key") -// ErrIndexGCMutationsList is returned by FindIndexWithID to signal that the -// index with the given ID does not have a descriptor and is in the garbage -// collected mutations list. -var ErrIndexGCMutationsList = errors.New("index in GC mutations list") - // PostDeserializationTableDescriptorChanges are a set of booleans to indicate // which types of upgrades or fixes occurred when filling in the descriptor // after deserialization. diff --git a/pkg/sql/catalog/tabledesc/table_desc.go b/pkg/sql/catalog/tabledesc/table_desc.go index f3b4d254ccbc..ea65b8579aef 100644 --- a/pkg/sql/catalog/tabledesc/table_desc.go +++ b/pkg/sql/catalog/tabledesc/table_desc.go @@ -334,11 +334,6 @@ func (desc *wrapper) FindIndexWithID(id descpb.IndexID) (catalog.Index, error) { }); idx != nil { return idx, nil } - for _, m := range desc.GCMutations { - if m.IndexID == id { - return nil, ErrIndexGCMutationsList - } - } return nil, errors.Errorf("index-id \"%d\" does not exist", id) } diff --git a/pkg/sql/gcjob/descriptor_utils.go b/pkg/sql/gcjob/descriptor_utils.go index 81f98cc9e2a2..b5bb960810ad 100644 --- a/pkg/sql/gcjob/descriptor_utils.go +++ b/pkg/sql/gcjob/descriptor_utils.go @@ -23,15 +23,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) +// updateDescriptorGCMutations removes the GCMutation for the given +// index ID. We no longer populate this field, but we still search it +// to remove existing entries. func updateDescriptorGCMutations( ctx context.Context, execCfg *sql.ExecutorConfig, tableID descpb.ID, garbageCollectedIndexID descpb.IndexID, ) error { - log.Infof(ctx, "updating GCMutations for table %d after removing index %d", - tableID, garbageCollectedIndexID) - // Remove the mutation from the table descriptor. return sql.DescsTxn(ctx, execCfg, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ) error { @@ -39,18 +39,26 @@ func updateDescriptorGCMutations( if err != nil { return err } + found := false for i := 0; i < len(tbl.GCMutations); i++ { other := tbl.GCMutations[i] if other.IndexID == garbageCollectedIndexID { tbl.GCMutations = append(tbl.GCMutations[:i], tbl.GCMutations[i+1:]...) + found = true break } } - b := txn.NewBatch() - if err := descsCol.WriteDescToBatch(ctx, false /* kvTrace */, tbl, b); err != nil { - return err + if found { + log.Infof(ctx, "updating GCMutations for table %d after removing index %d", + tableID, garbageCollectedIndexID) + // Remove the mutation from the table descriptor. + b := txn.NewBatch() + if err := descsCol.WriteDescToBatch(ctx, false /* kvTrace */, tbl, b); err != nil { + return err + } + return txn.Run(ctx, b) } - return txn.Run(ctx, b) + return nil }) } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic index bf58bbfb2217..da0d5688be4d 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic @@ -222,7 +222,7 @@ WHERE message NOT LIKE '%Z/%' AND message NOT LIKE 'querying next range at%' AND operation != 'dist sender send' ---- batch flow coordinator Del /NamespaceTable/30/1/56/57/"kv"/4/1 -batch flow coordinator Put /Table/3/1/58/2/1 -> table: parent_id:56 unexposed_parent_schema_id:57 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false > next_index_id:3 privileges: users: users: owner_proto:"root" version:2 > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" view_query:"" is_materialized_view:false new_schema_change_job_id:0 drop_time:... replacement_of: > audit_mode:DISABLED drop_job_id:0 gc_mutations: create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false > +batch flow coordinator Put /Table/3/1/58/2/1 -> table: parent_id:56 unexposed_parent_schema_id:57 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false > next_index_id:3 privileges: users: users: owner_proto:"root" version:2 > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" view_query:"" is_materialized_view:false new_schema_change_job_id:0 drop_time:... replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false > exec stmt rows affected: 0 # Check that session tracing does not inhibit the fast path for inserts & diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 548bfb76d4b4..36433e95f67a 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -1091,15 +1091,6 @@ func (sc *SchemaChanger) done(ctx context.Context) error { } isRollback = m.IsRollback() if idx := m.AsIndex(); m.Dropped() && idx != nil { - // how we keep track of dropped index names (for, e.g., zone config - // lookups), even though in the absence of a GC job there's nothing to - // clean them up. - scTable.GCMutations = append( - scTable.GCMutations, - descpb.TableDescriptor_GCDescriptorMutation{ - IndexID: idx.GetID(), - }) - description := sc.job.Payload().Description if isRollback { description = "ROLLBACK of " + description @@ -1108,7 +1099,6 @@ func (sc *SchemaChanger) done(ctx context.Context) error { if err := sc.createIndexGCJob(ctx, idx.GetID(), txn, description); err != nil { return err } - } if constraint := m.AsConstraint(); constraint != nil && constraint.Adding() { if constraint.IsForeignKey() && constraint.ForeignKey().Validity == descpb.ConstraintValidity_Unvalidated { diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index a95ea329f2bf..55d32f72d673 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -1539,15 +1539,6 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); return nil }) - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - - // There is still a DROP INDEX mutation waiting for GC. - if e := 1; len(tableDesc.GetGCMutations()) != e { - t.Fatalf("the table has %d instead of %d GC mutations", len(tableDesc.GetGCMutations()), e) - } else if m := tableDesc.GetGCMutations()[0]; m.IndexID != 2 && m.DropTime == 0 && m.JobID == 0 { - t.Fatalf("unexpected GC mutation %v", m) - } - // There is still some garbage index data that needs to be purged. All the // rows from k = 0 to k = chunkSize - 1 have index values. numGarbageValues := chunkSize @@ -1566,20 +1557,12 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); // above garbage left behind. atomic.StoreUint32(&enableAsyncSchemaChanges, 1) + // No garbage left behind. testutils.SucceedsSoon(t, func() error { - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if len(tableDesc.GetGCMutations()) > 0 { - return errors.Errorf("%d GC mutations remaining", len(tableDesc.GetGCMutations())) - } - return nil + numGarbageValues = 0 + return sqltestutils.CheckTableKeyCount(ctx, kvDB, 1, maxValue+1+numGarbageValues) }) - // No garbage left behind. - numGarbageValues = 0 - if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, 1, maxValue+1+numGarbageValues); err != nil { - t.Fatal(err) - } - // A new attempt cleans up a chunk of data. if attempts != expectedAttempts+1 { t.Fatalf("%d chunk ops, despite allowing only (schema change + reverse) = %d", attempts, expectedAttempts) @@ -1909,20 +1892,12 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); t.Fatal(err) } - testutils.SucceedsSoon(t, func() error { - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if len(tableDesc.GetGCMutations()) > 0 { - return errors.Errorf("%d gc mutations remaining", len(tableDesc.GetGCMutations())) - } - return nil - }) - ctx := context.Background() - // Check that the number of k-v pairs is accurate. - if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, 3, maxValue); err != nil { - t.Fatal(err) - } + testutils.SucceedsSoon(t, func() error { + // Check that the number of k-v pairs is accurate. + return sqltestutils.CheckTableKeyCount(ctx, kvDB, 3, maxValue) + }) // State of jobs table skip.WithIssue(t, 51796, "TODO(pbardea): The following fails due to causes seemingly unrelated to GC") @@ -2274,7 +2249,7 @@ func TestVisibilityDuringPrimaryKeyChange(t *testing.T) { if _, err := sqlDB.Exec(` CREATE DATABASE t; CREATE TABLE t.test (x INT PRIMARY KEY, y INT NOT NULL, z INT, INDEX i (z)); -INSERT INTO t.test VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3); +INSERT INTO t.test VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3); `); err != nil { t.Fatal(err) } @@ -2851,11 +2826,11 @@ func TestPrimaryKeyChangeKVOps(t *testing.T) { if _, err := sqlDB.Exec(` CREATE DATABASE t; CREATE TABLE t.test ( - x INT PRIMARY KEY, - y INT NOT NULL, - z INT, - a INT, - b INT, + x INT PRIMARY KEY, + y INT NOT NULL, + z INT, + a INT, + b INT, c INT, FAMILY (x), FAMILY (y), FAMILY (z, a), FAMILY (b), FAMILY (c) ) @@ -3185,7 +3160,7 @@ func TestMultiplePrimaryKeyChanges(t *testing.T) { if _, err := sqlDB.Exec(` CREATE DATABASE t; CREATE TABLE t.test (x INT NOT NULL, y INT NOT NULL, z INT NOT NULL, w int, INDEX i (w)); -INSERT INTO t.test VALUES (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3); +INSERT INTO t.test VALUES (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3); `); err != nil { t.Fatal(err) } @@ -6017,103 +5992,6 @@ ALTER TABLE t.test2 ADD FOREIGN KEY (k) REFERENCES t.test; } } -// TestOrphanedGCMutationsRemoved tests that if a table descriptor has a -// GCMutations which references a job that does not exist anymore, that it will -// eventually be cleaned up anyway. One way this can arise is when a table -// was backed up right after an index deletion. -func TestOrphanedGCMutationsRemoved(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - skip.WithIssue(t, 51796, "TODO (lucy): get rid of this test once GCMutations goes away") - params, _ := tests.CreateTestServerParams() - const chunkSize = 200 - // Disable synchronous schema change processing so that the mutations get - // processed asynchronously. - var enableAsyncSchemaChanges uint32 - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - BackfillChunkSize: chunkSize, - }, - } - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(context.Background()) - - // Disable strict GC TTL enforcement because we're going to shove a zero-value - // TTL into the system with AddImmediateGCZoneConfig. - defer sqltestutils.DisableGCTTLStrictEnforcement(t, sqlDB)() - - retryOpts := retry.Options{ - InitialBackoff: 20 * time.Millisecond, - MaxBackoff: 200 * time.Millisecond, - Multiplier: 2, - } - - // Create a k-v table. - if _, err := sqlDB.Exec(` -CREATE DATABASE t; -CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); -`); err != nil { - t.Fatal(err) - } - if _, err := sqlDB.Exec(`CREATE INDEX t_v ON t.test(v)`); err != nil { - t.Fatal(err) - } - - // Add some data. - const maxValue = chunkSize + 1 - if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil { - t.Fatal(err) - } - - tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - // Wait until indexes are created. - for r := retry.Start(retryOpts); r.Next(); { - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if len(tableDesc.PublicNonPrimaryIndexes()) == 1 { - break - } - } - - if _, err := sqlDB.Exec(`DROP INDEX t.t_v`); err != nil { - t.Fatal(err) - } - - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if e := 1; e != len(tableDesc.GetGCMutations()) { - t.Fatalf("e = %d, v = %d", e, len(tableDesc.GetGCMutations())) - } - - // Delete the associated job. - jobID := tableDesc.GetGCMutations()[0].JobID - if _, err := sqlDB.Exec(fmt.Sprintf("DELETE FROM system.jobs WHERE id=%d", jobID)); err != nil { - t.Fatal(err) - } - - // Ensure the GCMutations has not yet been completed. - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if e := 1; e != len(tableDesc.GetGCMutations()) { - t.Fatalf("e = %d, v = %d", e, len(tableDesc.GetGCMutations())) - } - - // Enable async schema change processing for purged schema changes. - atomic.StoreUint32(&enableAsyncSchemaChanges, 1) - - // Add immediate GC TTL to allow index creation purge to complete. - if _, err := sqltestutils.AddImmediateGCZoneConfig(sqlDB, tableDesc.GetID()); err != nil { - t.Fatal(err) - } - - // Ensure that GC mutations that cannot find their job will eventually be - // cleared. - testutils.SucceedsSoon(t, func() error { - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if len(tableDesc.GetGCMutations()) > 0 { - return errors.Errorf("%d gc mutations remaining", len(tableDesc.GetGCMutations())) - } - return nil - }) -} - // TestMultipleRevert starts a schema change then cancels it. After the canceled // job, after reversing the mutations the job is set up to throw an error so // that mutations are attempted to be reverted again. The mutation shouldn't be @@ -6613,7 +6491,7 @@ func TestFailureToMarkCanceledReversalLeadsToCanceledStatus(t *testing.T) { jobsErrGroup.Go(func() error { return testutils.SucceedsSoonError(func() error { return sqlDB.QueryRow(` -SELECT job_id FROM crdb_internal.jobs +SELECT job_id FROM crdb_internal.jobs WHERE description LIKE '%` + idxName + `%'`).Scan(&jobIDs[i]) }) }) @@ -6701,7 +6579,7 @@ func TestCancelMultipleQueued(t *testing.T) { jobsErrGroup.Go(func() error { return testutils.SucceedsSoonError(func() error { return sqlDB.QueryRow(` -SELECT job_id FROM crdb_internal.jobs +SELECT job_id FROM crdb_internal.jobs WHERE description LIKE '%` + idxName + `%'`).Scan(&jobIDs[i]) }) }) diff --git a/pkg/sql/tests/data.go b/pkg/sql/tests/data.go index 37e41865e2f9..35f5c6854470 100644 --- a/pkg/sql/tests/data.go +++ b/pkg/sql/tests/data.go @@ -19,17 +19,28 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/errors" ) // CheckKeyCount checks that the number of keys in the provided span matches // numKeys. func CheckKeyCount(t *testing.T, kvDB *kv.DB, span roachpb.Span, numKeys int) { t.Helper() - if kvs, err := kvDB.Scan(context.TODO(), span.Key, span.EndKey, 0); err != nil { + if err := CheckKeyCountE(t, kvDB, span, numKeys); err != nil { t.Fatal(err) + } +} + +// CheckKeyCountE returns an error if the the number of keys in the +// provided span does not match numKeys. +func CheckKeyCountE(t *testing.T, kvDB *kv.DB, span roachpb.Span, numKeys int) error { + t.Helper() + if kvs, err := kvDB.Scan(context.TODO(), span.Key, span.EndKey, 0); err != nil { + return err } else if l := numKeys; len(kvs) != l { - t.Fatalf("expected %d key value pairs, but got %d", l, len(kvs)) + return errors.Newf("expected %d key value pairs, but got %d", l, len(kvs)) } + return nil } // CreateKVTable creates a basic table named t. that stores key/value