Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
storage: fix deadlock in consistency queue
Browse files Browse the repository at this point in the history
When `CheckConsistency` returns an error, the queue checks whether the
store is draining to decide whether the error is worth logging.

Unfortunately this check was incorrect and would block until the store
actually started draining.

A toy example of this problem is below (this will deadlock). The dual
return form of chan receive isn't non-blocking -- the second parameter
indicates whether the received value corresponds to a closing of the
channel.

Switch to a `select` instead.

```
package main

import (
	"fmt"
)

func main() {
	ch := make(chan struct{})
	_, ok := <-ch
	fmt.Println(ok)
}
```

Touches cockroachdb#21824.

Release note (bug fix): Prevent the consistency checker from
deadlocking. This would previously manifest itself as a steady number of
replicas queued for consistency checking on one or more nodes and would
resolve by restarting the affected nodes.
tbg committed May 14, 2018

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 92fdda1 commit 4c13f6d
Showing 2 changed files with 44 additions and 5 deletions.
21 changes: 16 additions & 5 deletions pkg/storage/consistency_queue.go
Original file line number Diff line number Diff line change
@@ -102,19 +102,30 @@ func (q *consistencyQueue) process(
if q.interval() <= 0 {
return nil
}

// Update the last processed time for this queue. We do this before actually
// processing because this queue will do it even in the case of an error
// (since a failed processing run could be caused by down followers).
if err := repl.setQueueLastProcessed(ctx, q.name, repl.store.Clock().Now()); err != nil {
log.VErrEventf(ctx, 2, "failed to update last processed time: %v", err)
}

req := roachpb.CheckConsistencyRequest{}
if _, pErr := repl.CheckConsistency(ctx, req); pErr != nil {
_, shouldQuiesce := <-repl.store.Stopper().ShouldQuiesce()
var shouldQuiesce bool
select {
case <-repl.store.Stopper().ShouldQuiesce():
shouldQuiesce = true
default:
}

if !shouldQuiesce || !grpcutil.IsClosedConnection(pErr.GoError()) {
// Suppress noisy errors about closed GRPC connections when the
// server is quiescing.
log.Error(ctx, pErr.GoError())
return pErr.GoError()
}
}
// Update the last processed time for this queue.
if err := repl.setQueueLastProcessed(ctx, q.name, repl.store.Clock().Now()); err != nil {
log.VErrEventf(ctx, 2, "failed to update last processed time: %v", err)
}
return nil
}

28 changes: 28 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
@@ -9267,6 +9267,34 @@ func TestReplicaRecomputeStats(t *testing.T) {
}
}

// TestConsistenctQueueErrorFromCheckConsistency exercises the case in which
// the queue receives an error from CheckConsistency.
func TestConsistenctQueueErrorFromCheckConsistency(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

cfg := TestStoreConfig(nil)
cfg.TestingKnobs = StoreTestingKnobs{
TestingRequestFilter: func(ba roachpb.BatchRequest) *roachpb.Error {
if _, ok := ba.GetArg(roachpb.ComputeChecksum); ok {
return roachpb.NewErrorf("boom")
}
return nil
},
}
tc := testContext{}
tc.StartWithStoreConfig(t, stopper, cfg)

for i := 0; i < 2; i++ {
// Do this twice because it used to deadlock. See #25456.
sysCfg, _ := tc.store.Gossip().GetSystemConfig()
if err := tc.store.consistencyQueue.process(ctx, tc.repl, sysCfg); !testutils.IsError(err, "boom") {
t.Fatal(err)
}
}
}

// TestReplicaLocalRetries verifies local retry logic for transactional
// and non transactional batches. Verifies the timestamp cache is updated
// to reflect the timestamp at which retried batches are executed.

0 comments on commit 4c13f6d

Please sign in to comment.