diff --git a/pkg/storage/consistency_queue.go b/pkg/storage/consistency_queue.go index 927ca4e7dd32..6fd4f03f6708 100644 --- a/pkg/storage/consistency_queue.go +++ b/pkg/storage/consistency_queue.go @@ -102,19 +102,30 @@ func (q *consistencyQueue) process( if q.interval() <= 0 { return nil } + + // Call setQueueLastProcessed because the consistency checker targets a much + // longer cycle time than other queues. That it ignores errors is likely a + // historical accident that should be revisited. + if err := repl.setQueueLastProcessed(ctx, q.name, repl.store.Clock().Now()); err != nil { + log.VErrEventf(ctx, 2, "failed to update last processed time: %v", err) + } + req := roachpb.CheckConsistencyRequest{} if _, pErr := repl.CheckConsistency(ctx, req); pErr != nil { - _, shouldQuiesce := <-repl.store.Stopper().ShouldQuiesce() + var shouldQuiesce bool + select { + case <-repl.store.Stopper().ShouldQuiesce(): + shouldQuiesce = true + default: + } + if !shouldQuiesce || !grpcutil.IsClosedConnection(pErr.GoError()) { // Suppress noisy errors about closed GRPC connections when the // server is quiescing. log.Error(ctx, pErr.GoError()) + return pErr.GoError() } } - // Update the last processed time for this queue. - if err := repl.setQueueLastProcessed(ctx, q.name, repl.store.Clock().Now()); err != nil { - log.VErrEventf(ctx, 2, "failed to update last processed time: %v", err) - } return nil } diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 7cf76e9ab1ad..785ca9ebe420 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -9050,6 +9050,36 @@ func TestReplicaRecomputeStats(t *testing.T) { } } +// TestConsistencyQueueErrorFromCheckConsistency exercises the case in which +// the queue receives an error from CheckConsistency. +func TestConsistenctQueueErrorFromCheckConsistency(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + cfg := TestStoreConfig(nil) + cfg.TestingKnobs = StoreTestingKnobs{ + TestingRequestFilter: func(ba roachpb.BatchRequest) *roachpb.Error { + if _, ok := ba.GetArg(roachpb.ComputeChecksum); ok { + return roachpb.NewErrorf("boom") + } + return nil + }, + } + tc := testContext{} + tc.StartWithStoreConfig(t, stopper, cfg) + + for i := 0; i < 2; i++ { + // Do this twice because it used to deadlock. See #25456. + sysCfg, _ := tc.store.Gossip().GetSystemConfig() + if err := tc.store.consistencyQueue.process(ctx, tc.repl, sysCfg); !testutils.IsError(err, "boom") { + t.Fatal(err) + } + } +} + // TestReplicaLocalRetries verifies local retry logic for transactional // and non transactional batches. Verifies the timestamp cache is updated // to reflect the timestamp at which retried batches are executed.