Skip to content

Commit

Permalink
*: check node decommissioned/draining state for DistSQL/consistency
Browse files Browse the repository at this point in the history
The DistSQL planner and consistency queue did not take the nodes'
decommissioned or draining states into account, which in particular
could cause spurious errors when interacting with decommissioned nodes.

This patch adds convenience methods for checking node availability and
draining states, and avoids scheduling DistSQL flows on
unavailable nodes and consistency checks on unavailable/draining nodes.

Release note (bug fix): Avoid interacting with decommissioned nodes
during DistSQL planning and consistency checking.
  • Loading branch information
erikgrinaker committed Jun 25, 2021
1 parent 9502826 commit 78688ea
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 63 deletions.
15 changes: 6 additions & 9 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type consistencyQueue struct {
type consistencyShouldQueueData struct {
desc *roachpb.RangeDescriptor
getQueueLastProcessed func(ctx context.Context) (hlc.Timestamp, error)
isNodeLive func(nodeID roachpb.NodeID) (bool, error)
isNodeAvailable func(nodeID roachpb.NodeID) bool
disableLastProcessedCheck bool
interval time.Duration
}
Expand Down Expand Up @@ -105,12 +105,12 @@ func (q *consistencyQueue) shouldQueue(
getQueueLastProcessed: func(ctx context.Context) (hlc.Timestamp, error) {
return repl.getQueueLastProcessed(ctx, q.name)
},
isNodeLive: func(nodeID roachpb.NodeID) (bool, error) {
isNodeAvailable: func(nodeID roachpb.NodeID) bool {
if repl.store.cfg.NodeLiveness != nil {
return repl.store.cfg.NodeLiveness.IsLive(nodeID)
return repl.store.cfg.NodeLiveness.IsAvailableNotDraining(nodeID)
}
// Some tests run without a NodeLiveness configured.
return true, nil
return true
},
disableLastProcessedCheck: repl.store.cfg.TestingKnobs.DisableLastProcessedCheck,
interval: q.interval(),
Expand All @@ -136,12 +136,9 @@ func consistencyQueueShouldQueueImpl(
return false, 0
}
}
// Check if all replicas are live.
// Check if all replicas are available.
for _, rep := range data.desc.Replicas().Descriptors() {
if live, err := data.isNodeLive(rep.NodeID); err != nil {
log.VErrEventf(ctx, 3, "node %d liveness failed: %s", rep.NodeID, err)
return false, 0
} else if !live {
if !data.isNodeAvailable(rep.NodeID) {
return false, 0
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ func TestConsistencyQueueRequiresLive(t *testing.T) {
return testStart, nil
}

isNodeLive := func(nodeID roachpb.NodeID) (bool, error) {
return live, nil
isNodeAvailable := func(nodeID roachpb.NodeID) bool {
return live
}

if shouldQ, priority := kvserver.ConsistencyQueueShouldQueue(
context.Background(), clock.NowAsClockTimestamp(), desc, getQueueLastProcessed, isNodeLive,
context.Background(), clock.NowAsClockTimestamp(), desc, getQueueLastProcessed, isNodeAvailable,
false, interval); !shouldQ {
t.Fatalf("expected shouldQ true; got %t, %f", shouldQ, priority)
}

live = false

if shouldQ, priority := kvserver.ConsistencyQueueShouldQueue(
context.Background(), clock.NowAsClockTimestamp(), desc, getQueueLastProcessed, isNodeLive,
context.Background(), clock.NowAsClockTimestamp(), desc, getQueueLastProcessed, isNodeAvailable,
false, interval); shouldQ {
t.Fatalf("expected shouldQ false; got %t, %f", shouldQ, priority)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ func ConsistencyQueueShouldQueue(
now hlc.ClockTimestamp,
desc *roachpb.RangeDescriptor,
getQueueLastProcessed func(ctx context.Context) (hlc.Timestamp, error),
isNodeLive func(nodeID roachpb.NodeID) (bool, error),
isNodeAvailable func(nodeID roachpb.NodeID) bool,
disableLastProcessedCheck bool,
interval time.Duration,
) (bool, float64) {
return consistencyQueueShouldQueueImpl(ctx, now, consistencyShouldQueueData{
desc, getQueueLastProcessed, isNodeLive,
desc, getQueueLastProcessed, isNodeAvailable,
disableLastProcessedCheck, interval})
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,30 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) {
return liveness.IsLive(nl.clock.Now().GoTime()), nil
}

// IsAvailable returns whether or not the specified node is available to serve
// requests. It checks both the liveness and decommissioned states, but not
// draining or decommissioning (since it may still be a leaseholder for ranges).
// Returns false if the node is not in the local liveness table.
func (nl *NodeLiveness) IsAvailable(nodeID roachpb.NodeID) bool {
liveness, ok := nl.GetLiveness(nodeID)
return ok && liveness.IsLive(nl.clock.Now().GoTime()) && !liveness.Membership.Decommissioned()
}

// IsAvailableNotDraining returns whether or not the specified node is available
// to serve requests (i.e. it is live and not decommissioned) and is not in the
// process of draining/decommissioning. Note that draining/decommissioning nodes
// could still be leaseholders for ranges until drained, so this should not be
// used when the caller needs to be able to contact leaseholders directly.
// Returns false if the node is not in the local liveness table.
func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool {
liveness, ok := nl.GetLiveness(nodeID)
return ok &&
liveness.IsLive(nl.clock.Now().GoTime()) &&
!liveness.Membership.Decommissioning() &&
!liveness.Membership.Decommissioned() &&
!liveness.Draining
}

// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`.
type NodeLivenessStartOptions struct {
Stopper *stop.Stopper
Expand Down
15 changes: 10 additions & 5 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,15 +490,20 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
NodeID: cfg.nodeIDContainer,
}

var isLive func(roachpb.NodeID) (bool, error)
var isAvailable func(roachpb.NodeID) bool
nodeLiveness, ok := cfg.nodeLiveness.Optional(47900)
if ok {
isLive = nodeLiveness.IsLive
// TODO(erikgrinaker): We may want to use IsAvailableNotDraining instead, to
// avoid scheduling long-running flows (e.g. rangefeeds or backups) on nodes
// that are being drained/decommissioned. However, these nodes can still be
// leaseholders, and preventing processor scheduling on them can cause a
// performance cliff for e.g. table reads that then hit the network.
isAvailable = nodeLiveness.IsAvailable
} else {
// We're on a SQL tenant, so this is the only node DistSQL will ever
// schedule on - always returning true is fine.
isLive = func(roachpb.NodeID) (bool, error) {
return true, nil
isAvailable = func(roachpb.NodeID) bool {
return true
}
}

Expand Down Expand Up @@ -544,7 +549,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.nodeDescs,
cfg.gossip,
cfg.stopper,
isLive,
isAvailable,
cfg.nodeDialer,
),

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func startConnExecutor(
nil, /* nodeDescs */
gw,
stopper,
func(roachpb.NodeID) (bool, error) { return true, nil }, // everybody is live
func(roachpb.NodeID) bool { return true }, // everybody is available
nil, /* nodeDialer */
),
QueryCache: querycache.New(0),
Expand Down
28 changes: 10 additions & 18 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewDistSQLPlanner(
nodeDescs kvcoord.NodeDescStore,
gw gossip.OptionalGossip,
stopper *stop.Stopper,
isLive func(roachpb.NodeID) (bool, error),
isAvailable func(roachpb.NodeID) bool,
nodeDialer *nodedialer.Dialer,
) *DistSQLPlanner {
dsp := &DistSQLPlanner{
Expand All @@ -156,9 +156,9 @@ func NewDistSQLPlanner(
gossip: gw,
nodeDialer: nodeDialer,
nodeHealth: distSQLNodeHealth{
gossip: gw,
connHealth: nodeDialer.ConnHealth,
isLive: isLive,
gossip: gw,
connHealth: nodeDialer.ConnHealth,
isAvailable: isAvailable,
},
distSender: distSender,
nodeDescs: nodeDescs,
Expand Down Expand Up @@ -834,9 +834,9 @@ type SpanPartition struct {
}

type distSQLNodeHealth struct {
gossip gossip.OptionalGossip
isLive func(roachpb.NodeID) (bool, error)
connHealth func(roachpb.NodeID, rpc.ConnectionClass) error
gossip gossip.OptionalGossip
isAvailable func(roachpb.NodeID) bool
connHealth func(roachpb.NodeID, rpc.ConnectionClass) error
}

func (h *distSQLNodeHealth) check(ctx context.Context, nodeID roachpb.NodeID) error {
Expand All @@ -857,16 +857,8 @@ func (h *distSQLNodeHealth) check(ctx context.Context, nodeID roachpb.NodeID) er
return err
}
}
{
live, err := h.isLive(nodeID)
if err == nil && !live {
err = pgerror.Newf(pgcode.CannotConnectNow,
"node n%d is not live", errors.Safe(nodeID))
}
if err != nil {
return pgerror.Wrapf(err, pgcode.CannotConnectNow,
"not using n%d due to liveness", errors.Safe(nodeID))
}
if !h.isAvailable(nodeID) {
return pgerror.Newf(pgcode.CannotConnectNow, "not using n%d since it is not available", nodeID)
}

// Check that the node is not draining.
Expand All @@ -884,7 +876,7 @@ func (h *distSQLNodeHealth) check(ctx context.Context, nodeID roachpb.NodeID) er
}

if drainingInfo.Draining {
err := errors.Newf("not using n%d because it is draining", log.Safe(nodeID))
err := errors.Newf("not using n%d because it is draining", nodeID)
log.VEventf(ctx, 1, "%v", err)
return err
}
Expand Down
44 changes: 20 additions & 24 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,8 +867,8 @@ func TestPartitionSpans(t *testing.T) {
}
return nil
},
isLive: func(nodeID roachpb.NodeID) (bool, error) {
return true, nil
isAvailable: func(nodeID roachpb.NodeID) bool {
return true
},
},
}
Expand Down Expand Up @@ -1051,8 +1051,8 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) {
// All the nodes are healthy.
return nil
},
isLive: func(roachpb.NodeID) (bool, error) {
return true, nil
isAvailable: func(roachpb.NodeID) bool {
return true
},
},
}
Expand Down Expand Up @@ -1150,8 +1150,8 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
_, err := mockGossip.GetNodeIDAddress(node)
return err
},
isLive: func(roachpb.NodeID) (bool, error) {
return true, nil
isAvailable: func(roachpb.NodeID) bool {
return true
},
},
}
Expand Down Expand Up @@ -1215,14 +1215,11 @@ func TestCheckNodeHealth(t *testing.T) {
t.Fatal(err)
}

errLive := func(roachpb.NodeID) (bool, error) {
return false, errors.New("injected liveness error")
notAvailable := func(roachpb.NodeID) bool {
return false
}
notLive := func(roachpb.NodeID) (bool, error) {
return false, nil
}
live := func(roachpb.NodeID) (bool, error) {
return true, nil
available := func(roachpb.NodeID) bool {
return true
}

connHealthy := func(roachpb.NodeID, rpc.ConnectionClass) error {
Expand All @@ -1234,21 +1231,20 @@ func TestCheckNodeHealth(t *testing.T) {
_ = connUnhealthy

livenessTests := []struct {
isLive func(roachpb.NodeID) (bool, error)
exp string
isAvailable func(roachpb.NodeID) bool
exp string
}{
{live, ""},
{errLive, "not using n5 due to liveness: injected liveness error"},
{notLive, "not using n5 due to liveness: node n5 is not live"},
{available, ""},
{notAvailable, "not using n5 since it is not available"},
}

gw := gossip.MakeOptionalGossip(mockGossip)
for _, test := range livenessTests {
t.Run("liveness", func(t *testing.T) {
h := distSQLNodeHealth{
gossip: gw,
connHealth: connHealthy,
isLive: test.isLive,
gossip: gw,
connHealth: connHealthy,
isAvailable: test.isAvailable,
}
if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) {
t.Fatalf("expected %v, got %v", test.exp, err)
Expand All @@ -1267,9 +1263,9 @@ func TestCheckNodeHealth(t *testing.T) {
for _, test := range connHealthTests {
t.Run("connHealth", func(t *testing.T) {
h := distSQLNodeHealth{
gossip: gw,
connHealth: test.connHealth,
isLive: live,
gossip: gw,
connHealth: test.connHealth,
isAvailable: available,
}
if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) {
t.Fatalf("expected %v, got %v", test.exp, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/optionalnodeliveness/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Interface interface {
Self() (livenesspb.Liveness, bool)
GetLivenesses() []livenesspb.Liveness
GetLivenessesFromKV(ctx context.Context) ([]livenesspb.Liveness, error)
IsAvailable(roachpb.NodeID) bool
IsAvailableNotDraining(roachpb.NodeID) bool
IsLive(roachpb.NodeID) (bool, error)
}

Expand Down

0 comments on commit 78688ea

Please sign in to comment.