Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: fix deadlock in consistency queue #25456

Merged
merged 1 commit into from
May 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions pkg/storage/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,30 @@ func (q *consistencyQueue) process(
if q.interval() <= 0 {
return nil
}

// Call setQueueLastProcessed because the consistency checker targets a much
// longer cycle time than other queues. That it ignores errors is likely a
// historical accident that should be revisited.
if err := repl.setQueueLastProcessed(ctx, q.name, repl.store.Clock().Now()); err != nil {
log.VErrEventf(ctx, 2, "failed to update last processed time: %v", err)
}

req := roachpb.CheckConsistencyRequest{}
if _, pErr := repl.CheckConsistency(ctx, req); pErr != nil {
_, shouldQuiesce := <-repl.store.Stopper().ShouldQuiesce()
var shouldQuiesce bool
select {
case <-repl.store.Stopper().ShouldQuiesce():
shouldQuiesce = true
default:
}

if !shouldQuiesce || !grpcutil.IsClosedConnection(pErr.GoError()) {
// Suppress noisy errors about closed GRPC connections when the
// server is quiescing.
log.Error(ctx, pErr.GoError())
return pErr.GoError()
}
}
// Update the last processed time for this queue.
if err := repl.setQueueLastProcessed(ctx, q.name, repl.store.Clock().Now()); err != nil {
log.VErrEventf(ctx, 2, "failed to update last processed time: %v", err)
}
return nil
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9267,6 +9267,36 @@ func TestReplicaRecomputeStats(t *testing.T) {
}
}

// TestConsistencyQueueErrorFromCheckConsistency exercises the case in which
// the queue receives an error from CheckConsistency.
func TestConsistenctQueueErrorFromCheckConsistency(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

cfg := TestStoreConfig(nil)
cfg.TestingKnobs = StoreTestingKnobs{
TestingRequestFilter: func(ba roachpb.BatchRequest) *roachpb.Error {
if _, ok := ba.GetArg(roachpb.ComputeChecksum); ok {
return roachpb.NewErrorf("boom")
}
return nil
},
}
tc := testContext{}
tc.StartWithStoreConfig(t, stopper, cfg)

for i := 0; i < 2; i++ {
// Do this twice because it used to deadlock. See #25456.
sysCfg, _ := tc.store.Gossip().GetSystemConfig()
if err := tc.store.consistencyQueue.process(ctx, tc.repl, sysCfg); !testutils.IsError(err, "boom") {
t.Fatal(err)
}
}
}

// TestReplicaLocalRetries verifies local retry logic for transactional
// and non transactional batches. Verifies the timestamp cache is updated
// to reflect the timestamp at which retried batches are executed.
Expand Down