From 7055cc8d96883bda2f69bb1fe8bc7512b2543132 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 14 May 2018 12:59:55 -0400 Subject: [PATCH] storage: fix deadlock in consistency queue When `CheckConsistency` returns an error, the queue checks whether the store is draining to decide whether the error is worth logging. Unfortunately this check was incorrect and would block until the store actually started draining. A toy example of this problem is below (this will deadlock). The dual return form of chan receive isn't non-blocking -- the second parameter indicates whether the received value corresponds to a closing of the channel. Switch to a `select` instead. ``` package main import ( "fmt" ) func main() { ch := make(chan struct{}) _, ok := <-ch fmt.Println(ok) } ``` Touches #21824. Release note (bug fix): Prevent the consistency checker from deadlocking. This would previously manifest itself as a steady number of replicas queued for consistency checking on one or more nodes and would resolve by restarting the affected nodes. --- pkg/storage/consistency_queue.go | 21 ++++++++++++++++----- pkg/storage/replica_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/pkg/storage/consistency_queue.go b/pkg/storage/consistency_queue.go index 8e2f6942686a..445a7e4c998f 100644 --- a/pkg/storage/consistency_queue.go +++ b/pkg/storage/consistency_queue.go @@ -102,19 +102,30 @@ func (q *consistencyQueue) process( if q.interval() <= 0 { return nil } + + // Call setQueueLastProcessed because the consistency checker targets a much + // longer cycle time than other queues. That it ignores errors is likely a + // historical accident that should be revisited. + if err := repl.setQueueLastProcessed(ctx, q.name, repl.store.Clock().Now()); err != nil { + log.VErrEventf(ctx, 2, "failed to update last processed time: %v", err) + } + req := roachpb.CheckConsistencyRequest{} if _, pErr := repl.CheckConsistency(ctx, req); pErr != nil { - _, shouldQuiesce := <-repl.store.Stopper().ShouldQuiesce() + var shouldQuiesce bool + select { + case <-repl.store.Stopper().ShouldQuiesce(): + shouldQuiesce = true + default: + } + if !shouldQuiesce || !grpcutil.IsClosedConnection(pErr.GoError()) { // Suppress noisy errors about closed GRPC connections when the // server is quiescing. log.Error(ctx, pErr.GoError()) + return pErr.GoError() } } - // Update the last processed time for this queue. - if err := repl.setQueueLastProcessed(ctx, q.name, repl.store.Clock().Now()); err != nil { - log.VErrEventf(ctx, 2, "failed to update last processed time: %v", err) - } return nil } diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 205e00622cec..52f05797cb72 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -9267,6 +9267,36 @@ func TestReplicaRecomputeStats(t *testing.T) { } } +// TestConsistencyQueueErrorFromCheckConsistency exercises the case in which +// the queue receives an error from CheckConsistency. +func TestConsistenctQueueErrorFromCheckConsistency(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + cfg := TestStoreConfig(nil) + cfg.TestingKnobs = StoreTestingKnobs{ + TestingRequestFilter: func(ba roachpb.BatchRequest) *roachpb.Error { + if _, ok := ba.GetArg(roachpb.ComputeChecksum); ok { + return roachpb.NewErrorf("boom") + } + return nil + }, + } + tc := testContext{} + tc.StartWithStoreConfig(t, stopper, cfg) + + for i := 0; i < 2; i++ { + // Do this twice because it used to deadlock. See #25456. + sysCfg, _ := tc.store.Gossip().GetSystemConfig() + if err := tc.store.consistencyQueue.process(ctx, tc.repl, sysCfg); !testutils.IsError(err, "boom") { + t.Fatal(err) + } + } +} + // TestReplicaLocalRetries verifies local retry logic for transactional // and non transactional batches. Verifies the timestamp cache is updated // to reflect the timestamp at which retried batches are executed.