Skip to content

Commit

Permalink
Merge #26821
Browse files Browse the repository at this point in the history
26821: server: general correctness fixes for ListSessions / Ranges / upgradeStatus r=knz a=knz

Prior to this patch, various parts of the code what were attempting to
"iterate over all nodes in the cluster" were confused and would
either:

- skip over nodes that are known to exist in KV but for which gossip
  information was currently unavailable, or
- fail to skip over removed nodes (dead + decommissioned) or
- incorrectly skip over temporarily failed nodes.

This patch comprehensively addresses this class of issues by:

- tweaking `(*NodeLiveness).GetIsLiveMap()` to exclude decommissioned
  nodes upfront.
- tweaking `(*NodeLiveness).GetLivenessStatusMap()` to conditionally
  exclude decommissioned nodes upfront, and changing all callers
  accordingly.
- providing a new method `(*statusServer).NodesWithLiveness()` which
  always includes all nodes known in KV, not just those for which
  gossip information is available.
- ensuring that `(*Server).upgradeStatus()` does not incorrectly skip
  over temporarily dead nodes or nodes yet unknown to gossip by
  using the new `NodesWithLiveness()` method appropriately.
- providing a new method `(*statusServer).iterateNodes()` which does
  "the right thing" via `NodesWithLiveness()` and using it for
  the status RPCs `ListSessions()` and `Range()`.

Additionally this patch makes SHOW QUERIES and SHOW SESSIONS report
error details when it fails to query information from a node.

Release note (bug fix): the server will not finalize a version upgrade
automatically and erroneously if there are nodes temporarily inactive
in the cluster.

Release note (sql change): SHOW CLUSTER QUERIES/SESSIONS now report
the details of the error upon failing to gather data from other nodes.

Fixes #22863.
Fixes #26852.
Fixes #26897.

Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
craig[bot] and knz committed Jun 29, 2018
2 parents fd96096 + 1ab9cb5 commit f164295
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 164 deletions.
6 changes: 4 additions & 2 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,9 +831,11 @@ func (g *Gossip) getNodeDescriptorLocked(nodeID roachpb.NodeID) (*roachpb.NodeDe
}
// Don't return node descriptors that are empty, because that's meant to
// indicate that the node has been removed from the cluster.
if !(nodeDescriptor.NodeID == 0 && nodeDescriptor.Address.IsEmpty()) {
return nodeDescriptor, nil
if nodeDescriptor.NodeID == 0 || nodeDescriptor.Address.IsEmpty() {
return nil, errors.Errorf("node %d has been removed from the cluster", nodeID)
}

return nodeDescriptor, nil
}

return nil, errors.Errorf("unable to look up descriptor for node %d", nodeID)
Expand Down
4 changes: 2 additions & 2 deletions pkg/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestGossipOverwriteNode(t *testing.T) {
// Quiesce the stopper now to ensure that the update has propagated before
// checking whether node 1 has been removed from the infoStore.
stopper.Quiesce(context.TODO())
expectedErr := "unable to look up descriptor for node"
expectedErr := "node.*has been removed from the cluster"
if val, err := g.GetNodeDescriptor(node1.NodeID); !testutils.IsError(err, expectedErr) {
t.Errorf("expected error %q fetching node %d; got error %v and node %+v",
expectedErr, node1.NodeID, err, val)
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestGossipMoveNode(t *testing.T) {
} else if !proto.Equal(movedNode, val) {
t.Errorf("expected node %+v, got %+v", movedNode, val)
}
expectedErr := "unable to look up descriptor for node"
expectedErr := "node.*has been removed from the cluster"
if val, err := g.GetNodeDescriptor(replacedNode.NodeID); !testutils.IsError(err, expectedErr) {
t.Errorf("expected error %q fetching node %d; got error %v and node %+v",
expectedErr, replacedNode.NodeID, err, val)
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,9 @@ func (s *adminServer) Health(
return &serverpb.HealthResponse{}, nil
}

// Liveness returns the liveness state of all nodes on the cluster.
// Liveness returns the liveness state of all nodes on the cluster
// known to gossip. To reach all nodes in the cluster, consider
// using (statusServer).NodesWithLiveness instead.
func (s *adminServer) Liveness(
context.Context, *serverpb.LivenessRequest,
) (*serverpb.LivenessResponse, error) {
Expand Down
38 changes: 14 additions & 24 deletions pkg/server/server_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -105,26 +104,24 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) {
return false, err
}

datums, _, err := s.internalExecutor.Query(
ctx, "read-gossip", nil, /* txn */
"SELECT node_id, server_version FROM crdb_internal.gossip_nodes;",
)
nodesWithLiveness, err := s.status.NodesWithLiveness(ctx)
if err != nil {
return false, err
}

statusMap := s.nodeLiveness.GetLivenessStatusMap()
var newVersion string
for _, row := range datums {
id := roachpb.NodeID(int32(tree.MustBeDInt(row[0])))
version := string(tree.MustBeDString(row[1]))

if statusMap[id] == storage.NodeLivenessStatus_LIVE {
if newVersion == "" {
newVersion = version
} else if version != newVersion {
return false, errors.New("not all nodes are running the latest version yet")
}
for nodeID, st := range nodesWithLiveness {
if st.LivenessStatus != storage.NodeLivenessStatus_LIVE &&
st.LivenessStatus != storage.NodeLivenessStatus_DECOMMISSIONING {
return false, errors.Errorf("node %d not running (%s), cannot determine version",
nodeID, st.LivenessStatus)
}

version := st.Desc.ServerVersion.String()
if newVersion == "" {
newVersion = version
} else if version != newVersion {
return false, errors.New("not all nodes are running the latest version yet")
}
}

Expand All @@ -134,7 +131,7 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) {
}

// Check if auto upgrade is enabled at current version.
datums, _, err = s.internalExecutor.Query(
datums, _, err := s.internalExecutor.Query(
ctx, "read-downgrade", nil, /* txn */
"SELECT value FROM system.settings WHERE name = 'cluster.preserve_downgrade_option';",
)
Expand All @@ -151,13 +148,6 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) {
}
}

// Check if all non-decommissioned nodes are alive.
for id, status := range statusMap {
if status != storage.NodeLivenessStatus_DECOMMISSIONED && status != storage.NodeLivenessStatus_LIVE {
return false, errors.Errorf("node %d is not decommissioned but not alive, node status: %s.",
id, storage.NodeLivenessStatus_name[int32(status)])
}
}
return false, nil
}

Expand Down
Loading

0 comments on commit f164295

Please sign in to comment.