Skip to content

Commit

Permalink
cli: Fix debug zip for KV nodes
Browse files Browse the repository at this point in the history
This commit addresses a bug where the nodes.json
and status.json data for storage servers was not
getting populated correctly due to a recent switch
to the NodesList API.
This bug has been addressed by using the Nodes API
when the debug zip command is run against a storage
server.

Release note: None
  • Loading branch information
rimadeodhar committed Feb 16, 2022
1 parent ae8b49b commit 13c7cf5
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 68 deletions.
45 changes: 30 additions & 15 deletions pkg/cli/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/heapprofiler"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -94,13 +95,24 @@ func (zc *debugZipContext) runZipRequest(ctx context.Context, zr *zipReporter, r
// forAllNodes runs fn on every node, possibly concurrently.
func (zc *debugZipContext) forAllNodes(
ctx context.Context,
nodeList []serverpb.NodeDetails,
fn func(ctx context.Context, node serverpb.NodeDetails) error,
ni nodesInfo,
fn func(ctx context.Context, nodeDetails serverpb.NodeDetails, nodeStatus *statuspb.NodeStatus) error,
) error {
if ni.nodeListResponse == nil {
// Nothing to do, return
return errors.AssertionFailedf("nodes list is empty")
}
if ni.nodeStatusResponse != nil && len(ni.nodeStatusResponse.Nodes) != len(ni.nodeListResponse.Nodes) {
return errors.AssertionFailedf("mismatching node status response and node list")
}
if zipCtx.concurrency == 1 {
// Sequential case. Simplify.
for _, node := range nodeList {
if err := fn(ctx, node); err != nil {
for index, nodeDetails := range ni.nodeListResponse.Nodes {
var nodeStatus *statuspb.NodeStatus
if ni.nodeStatusResponse != nil {
nodeStatus = &ni.nodeStatusResponse.Nodes[index]
}
if err := fn(ctx, nodeDetails, nodeStatus); err != nil {
return err
}
}
Expand All @@ -110,27 +122,31 @@ func (zc *debugZipContext) forAllNodes(
// Multiple nodes concurrently.

// nodeErrs collects the individual error objects.
nodeErrs := make(chan error, len(nodeList))
nodeErrs := make(chan error, len(ni.nodeListResponse.Nodes))
// The wait group to wait for all concurrent collectors.
var wg sync.WaitGroup
for _, node := range nodeList {
for index, nodeDetails := range ni.nodeListResponse.Nodes {
wg.Add(1)
go func(node serverpb.NodeDetails) {
var nodeStatus *statuspb.NodeStatus
if ni.nodeStatusResponse != nil {
nodeStatus = &ni.nodeStatusResponse.Nodes[index]
}
go func(nodeDetails serverpb.NodeDetails, nodeStatus *statuspb.NodeStatus) {
defer wg.Done()
if err := zc.sem.Acquire(ctx, 1); err != nil {
nodeErrs <- err
return
}
defer zc.sem.Release(1)

nodeErrs <- fn(ctx, node)
}(node)
nodeErrs <- fn(ctx, nodeDetails, nodeStatus)
}(nodeDetails, nodeStatus)
}
wg.Wait()

// The final error.
var err error
for range nodeList {
for range ni.nodeListResponse.Nodes {
err = errors.CombineErrors(err, <-nodeErrs)
}
return err
Expand Down Expand Up @@ -231,20 +247,19 @@ func runDebugZip(_ *cobra.Command, args []string) (retErr error) {
// For a SQL only server, the nodeList will be a list of SQL nodes
// and livenessByNodeID is null. For a KV server, the nodeList will
// be a list of KV nodes along with the corresponding node liveness data.
nodeList, livenessByNodeID, err := zc.collectClusterData(ctx, firstNodeDetails)
ni, livenessByNodeID, err := zc.collectClusterData(ctx, firstNodeDetails)
if err != nil {
return err
}

// Collect the CPU profiles, before the other per-node requests
// below possibly influences the nodes and thus CPU profiles.
if err := zc.collectCPUProfiles(ctx, nodeList, livenessByNodeID); err != nil {
if err := zc.collectCPUProfiles(ctx, ni, livenessByNodeID); err != nil {
return err
}

// Collect the per-node data.
if err := zc.forAllNodes(ctx, nodeList, func(ctx context.Context, node serverpb.NodeDetails) error {
return zc.collectPerNodeData(ctx, node, livenessByNodeID)
if err := zc.forAllNodes(ctx, ni, func(ctx context.Context, nodeDetails serverpb.NodeDetails, nodesStatus *statuspb.NodeStatus) error {
return zc.collectPerNodeData(ctx, nodeDetails, nodesStatus, livenessByNodeID)
}); err != nil {
return err
}
Expand Down
110 changes: 66 additions & 44 deletions pkg/cli/zip_cluster_wide.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,35 +112,25 @@ var debugZipTablesPerCluster = []string{
"crdb_internal.table_indexes",
}

// getNodesList constructs a NodesListResponse using the Nodes API. We need this while building
// the nodes list for older servers that don't support the new NodesList API.
func (zc *debugZipContext) getNodesList(ctx context.Context) (*serverpb.NodesListResponse, error) {
nodes, err := zc.status.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return nil, err
}
nodesList := &serverpb.NodesListResponse{}
for _, node := range nodes.Nodes {
nodeDetails := serverpb.NodeDetails{
NodeID: int32(node.Desc.NodeID),
Address: node.Desc.Address,
SQLAddress: node.Desc.SQLAddress,
}
nodesList.Nodes = append(nodesList.Nodes, nodeDetails)
}
return nodesList, nil
// nodesInfo holds node details pulled from a SQL or KV node.
// SQL only servers will only return nodeDetails for all SQL nodes.
// Storage servers will return both nodeDetails as well as nodeStatus
// for all KV nodes.
type nodesInfo struct {
nodeStatusResponse *serverpb.NodesResponse
nodeListResponse *serverpb.NodesListResponse
}

// collectClusterData runs the data collection that only needs to
// occur once for the entire cluster.
func (zc *debugZipContext) collectClusterData(
ctx context.Context, firstNodeDetails *serverpb.DetailsResponse,
) (nodeList []serverpb.NodeDetails, livenessByNodeID nodeLivenesses, err error) {
) (ni nodesInfo, livenessByNodeID nodeLivenesses, err error) {
clusterWideZipRequests := makeClusterWideZipRequests(zc.admin, zc.status)

for _, r := range clusterWideZipRequests {
if err := zc.runZipRequest(ctx, zc.clusterPrinter, r); err != nil {
return nil, nil, err
return nodesInfo{}, nil, err
}
}

Expand All @@ -150,39 +140,38 @@ func (zc *debugZipContext) collectClusterData(
query = override
}
if err := zc.dumpTableDataForZip(zc.clusterPrinter, zc.firstNodeSQLConn, debugBase, table, query); err != nil {
return nil, nil, errors.Wrapf(err, "fetching %s", table)
return nodesInfo{}, nil, errors.Wrapf(err, "fetching %s", table)
}
}

{
var nodes *serverpb.NodesListResponse
s := zc.clusterPrinter.start("requesting nodes")
err := zc.runZipFn(ctx, s, func(ctx context.Context) error {
nodes, err = zc.status.NodesList(ctx, &serverpb.NodesListRequest{})
if code := status.Code(errors.Cause(err)); code == codes.Unimplemented {
// Fallback to the old Nodes API; this could occur while connecting to
// an older node which does not have the NodesList API implemented.
nodes, err = zc.getNodesList(ctx)
}
ni, err = zc.nodesInfo(ctx)
return err
})
if cErr := zc.z.createJSONOrError(s, debugBase+"/nodes.json", nodes, err); cErr != nil {
return nil, nil, cErr
if ni.nodeStatusResponse != nil {
if cErr := zc.z.createJSONOrError(s, debugBase+"/nodes.json", ni.nodeStatusResponse, err); cErr != nil {
return nodesInfo{}, nil, cErr
}
} else {
if cErr := zc.z.createJSONOrError(s, debugBase+"/nodes.json", ni.nodeListResponse, err); cErr != nil {
return nodesInfo{}, nil, cErr
}
}

// In case nodes came up back empty (the Nodes() RPC failed), we
// still want to inspect the per-node endpoints on the head
// node. As per the above, we were able to connect at least to
// that.
nodeList = []serverpb.NodeDetails{{
NodeID: int32(firstNodeDetails.NodeID),
Address: firstNodeDetails.Address,
SQLAddress: firstNodeDetails.SQLAddress,
}}

if nodes != nil {
// If the nodes were found, use that instead.
nodeList = nodes.Nodes
if ni.nodeListResponse == nil {
// In case nodes came up back empty (the Nodes()/NodesList() RPC failed), we
// still want to inspect the per-node endpoints on the head
// node. As per the above, we were able to connect at least to
// that.
ni.nodeListResponse = &serverpb.NodesListResponse{
Nodes: []serverpb.NodeDetails{{
NodeID: int32(firstNodeDetails.NodeID),
Address: firstNodeDetails.Address,
SQLAddress: firstNodeDetails.SQLAddress,
}},
}
}

// We'll want livenesses to decide whether a node is decommissioned.
Expand All @@ -193,12 +182,45 @@ func (zc *debugZipContext) collectClusterData(
return err
})
if cErr := zc.z.createJSONOrError(s, livenessName+".json", nodes, err); cErr != nil {
return nil, nil, cErr
return nodesInfo{}, nil, cErr
}
livenessByNodeID = map[roachpb.NodeID]livenesspb.NodeLivenessStatus{}
if lresponse != nil {
livenessByNodeID = lresponse.Statuses
}
}
return nodeList, livenessByNodeID, nil
return ni, livenessByNodeID, nil
}

// nodesInfo constructs debug data for all nodes for the debug zip output.
// For SQL only servers, only the NodesListResponse is populated.
// For regular storage servers, the more detailed NodesResponse is
// returned along with the nodesListResponse.
func (zc *debugZipContext) nodesInfo(ctx context.Context) (ni nodesInfo, _ error) {
nodesResponse, err := zc.status.Nodes(ctx, &serverpb.NodesRequest{})
nodesList := &serverpb.NodesListResponse{}
if code := status.Code(errors.Cause(err)); code == codes.Unimplemented {
// Likely a SQL only server; try the NodesList endpoint.
nodesList, err = zc.status.NodesList(ctx, &serverpb.NodesListRequest{})
}
if err != nil {
return nodesInfo{}, err
}
if nodesResponse != nil {
// Build a nodesListResponse from the nodes data. nodesListResponse is needed
// further downstream to perform other debug zip related functionality such as
// collecting per node debug data.
for _, node := range nodesResponse.Nodes {
nodeDetails := serverpb.NodeDetails{
NodeID: int32(node.Desc.NodeID),
Address: node.Desc.Address,
SQLAddress: node.Desc.SQLAddress,
}
nodesList.Nodes = append(nodesList.Nodes, nodeDetails)
}
}
ni.nodeListResponse = nodesList
ni.nodeStatusResponse = nodesResponse

return ni, nil
}
32 changes: 23 additions & 9 deletions pkg/cli/zip_per_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -91,9 +92,8 @@ var debugZipTablesPerNode = []string{
//
// This is called first and in isolation, before other zip operations
// possibly influence the nodes.
// TODO(rima): Collect profiles for tenant SQL nodes.
func (zc *debugZipContext) collectCPUProfiles(
ctx context.Context, nodeList []serverpb.NodeDetails, livenessByNodeID nodeLivenesses,
ctx context.Context, ni nodesInfo, livenessByNodeID nodeLivenesses,
) error {
if zipCtx.cpuProfDuration <= 0 {
// Nothing to do; return early.
Expand All @@ -108,6 +108,11 @@ func (zc *debugZipContext) collectCPUProfiles(

zc.clusterPrinter.info("requesting CPU profiles")

if ni.nodeListResponse == nil {
return errors.AssertionFailedf("nodes list is empty; nothing to do")
}

nodeList := ni.nodeListResponse.Nodes
// NB: this takes care not to produce non-deterministic log output.
resps := make([]profData, len(nodeList))
for i := range nodeList {
Expand Down Expand Up @@ -164,9 +169,12 @@ func (zc *debugZipContext) collectCPUProfiles(
}

func (zc *debugZipContext) collectPerNodeData(
ctx context.Context, node serverpb.NodeDetails, livenessByNodeID nodeLivenesses,
ctx context.Context,
nodeDetails serverpb.NodeDetails,
nodeStatus *statuspb.NodeStatus,
livenessByNodeID nodeLivenesses,
) error {
nodeID := roachpb.NodeID(node.NodeID)
nodeID := roachpb.NodeID(nodeDetails.NodeID)

if livenessByNodeID != nil {
liveness := livenessByNodeID[nodeID]
Expand Down Expand Up @@ -195,9 +203,15 @@ func (zc *debugZipContext) collectPerNodeData(
}
return nil
}

if err := zc.z.createJSON(nodePrinter.start("node status"), prefix+"/status.json", node); err != nil {
return err
if nodeStatus != nil {
// Use nodeStatus to populate the status.json file as it contains more data for a KV node.
if err := zc.z.createJSON(nodePrinter.start("node status"), prefix+"/status.json", *nodeStatus); err != nil {
return err
}
} else {
if err := zc.z.createJSON(nodePrinter.start("node status"), prefix+"/status.json", nodeDetails); err != nil {
return err
}
}

// Don't use sqlConn because that's only for is the node `debug
Expand All @@ -207,9 +221,9 @@ func (zc *debugZipContext) collectPerNodeData(
// not work and if it doesn't, we let the invalid curSQLConn get
// used anyway so that anything that does *not* need it will
// still happen.
sqlAddr := node.SQLAddress
sqlAddr := nodeDetails.SQLAddress
if sqlAddr.IsEmpty() {
sqlAddr = node.Address
sqlAddr = nodeDetails.Address
}
curSQLConn := guessNodeURL(zc.firstNodeSQLConn.GetURL(), sqlAddr.AddressField)
nodePrinter.info("using SQL connection URL: %s", curSQLConn.GetURL())
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"statement_diagnostics_requests.go",
"statements.go",
"status.go",
"status_util.go",
"sticky_engine.go",
"tcp_keepalive_manager.go",
"tenant.go",
Expand Down

0 comments on commit 13c7cf5

Please sign in to comment.