Skip to content

Commit

Permalink
cli: Fix debug zip for storage nodes
Browse files Browse the repository at this point in the history
This commit addresses a bug where the nodes.json
and status.json data for storage servers was not
getting populated correctly. Due to a recent switch
to the NodesList API, the nodes and status json files
were missing information such as store metrics,
store status etc. for storage nodes.
This bug has been addressed by reverting back to using
the Nodes API when the debug zip command is run against
a storage server.
We will continue to use NodesList API for SQL only
servers since we want to filter out storage specific
information for SQL only servers.

Release note: None
  • Loading branch information
rimadeodhar committed Feb 23, 2022
1 parent 79ee8d0 commit 8227fde
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 68 deletions.
46 changes: 31 additions & 15 deletions pkg/cli/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/heapprofiler"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -94,13 +95,25 @@ func (zc *debugZipContext) runZipRequest(ctx context.Context, zr *zipReporter, r
// forAllNodes runs fn on every node, possibly concurrently.
func (zc *debugZipContext) forAllNodes(
ctx context.Context,
nodeList []serverpb.NodeDetails,
fn func(ctx context.Context, node serverpb.NodeDetails) error,
ni nodesInfo,
fn func(ctx context.Context, nodeDetails serverpb.NodeDetails, nodeStatus *statuspb.NodeStatus) error,
) error {
if ni.nodesListResponse == nil {
// Nothing to do, return
return errors.AssertionFailedf("nodes list is empty")
}
if ni.nodesStatusResponse != nil && len(ni.nodesStatusResponse.Nodes) != len(ni.nodesListResponse.Nodes) {
return errors.AssertionFailedf("mismatching node status response and node list")
}
if zipCtx.concurrency == 1 {
// Sequential case. Simplify.
for _, node := range nodeList {
if err := fn(ctx, node); err != nil {
for index, nodeDetails := range ni.nodesListResponse.Nodes {
var nodeStatus *statuspb.NodeStatus
// nodeStatusResponse is expected to be nil for SQL only servers.
if ni.nodesStatusResponse != nil {
nodeStatus = &ni.nodesStatusResponse.Nodes[index]
}
if err := fn(ctx, nodeDetails, nodeStatus); err != nil {
return err
}
}
Expand All @@ -110,27 +123,31 @@ func (zc *debugZipContext) forAllNodes(
// Multiple nodes concurrently.

// nodeErrs collects the individual error objects.
nodeErrs := make(chan error, len(nodeList))
nodeErrs := make(chan error, len(ni.nodesListResponse.Nodes))
// The wait group to wait for all concurrent collectors.
var wg sync.WaitGroup
for _, node := range nodeList {
for index, nodeDetails := range ni.nodesListResponse.Nodes {
wg.Add(1)
go func(node serverpb.NodeDetails) {
var nodeStatus *statuspb.NodeStatus
if ni.nodesStatusResponse != nil {
nodeStatus = &ni.nodesStatusResponse.Nodes[index]
}
go func(nodeDetails serverpb.NodeDetails, nodeStatus *statuspb.NodeStatus) {
defer wg.Done()
if err := zc.sem.Acquire(ctx, 1); err != nil {
nodeErrs <- err
return
}
defer zc.sem.Release(1)

nodeErrs <- fn(ctx, node)
}(node)
nodeErrs <- fn(ctx, nodeDetails, nodeStatus)
}(nodeDetails, nodeStatus)
}
wg.Wait()

// The final error.
var err error
for range nodeList {
for range ni.nodesListResponse.Nodes {
err = errors.CombineErrors(err, <-nodeErrs)
}
return err
Expand Down Expand Up @@ -231,20 +248,19 @@ func runDebugZip(_ *cobra.Command, args []string) (retErr error) {
// For a SQL only server, the nodeList will be a list of SQL nodes
// and livenessByNodeID is null. For a KV server, the nodeList will
// be a list of KV nodes along with the corresponding node liveness data.
nodeList, livenessByNodeID, err := zc.collectClusterData(ctx, firstNodeDetails)
ni, livenessByNodeID, err := zc.collectClusterData(ctx, firstNodeDetails)
if err != nil {
return err
}

// Collect the CPU profiles, before the other per-node requests
// below possibly influences the nodes and thus CPU profiles.
if err := zc.collectCPUProfiles(ctx, nodeList, livenessByNodeID); err != nil {
if err := zc.collectCPUProfiles(ctx, ni, livenessByNodeID); err != nil {
return err
}

// Collect the per-node data.
if err := zc.forAllNodes(ctx, nodeList, func(ctx context.Context, node serverpb.NodeDetails) error {
return zc.collectPerNodeData(ctx, node, livenessByNodeID)
if err := zc.forAllNodes(ctx, ni, func(ctx context.Context, nodeDetails serverpb.NodeDetails, nodesStatus *statuspb.NodeStatus) error {
return zc.collectPerNodeData(ctx, nodeDetails, nodesStatus, livenessByNodeID)
}); err != nil {
return err
}
Expand Down
111 changes: 67 additions & 44 deletions pkg/cli/zip_cluster_wide.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,35 +112,25 @@ var debugZipTablesPerCluster = []string{
"crdb_internal.table_indexes",
}

// getNodesList constructs a NodesListResponse using the Nodes API. We need this while building
// the nodes list for older servers that don't support the new NodesList API.
func (zc *debugZipContext) getNodesList(ctx context.Context) (*serverpb.NodesListResponse, error) {
nodes, err := zc.status.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return nil, err
}
nodesList := &serverpb.NodesListResponse{}
for _, node := range nodes.Nodes {
nodeDetails := serverpb.NodeDetails{
NodeID: int32(node.Desc.NodeID),
Address: node.Desc.Address,
SQLAddress: node.Desc.SQLAddress,
}
nodesList.Nodes = append(nodesList.Nodes, nodeDetails)
}
return nodesList, nil
// nodesInfo holds node details pulled from a SQL or storage node.
// SQL only servers will only return nodesListResponse for all SQL nodes.
// Storage servers will return both nodesListResponse and nodesStatusResponse
// for all storage nodes.
type nodesInfo struct {
nodesStatusResponse *serverpb.NodesResponse
nodesListResponse *serverpb.NodesListResponse
}

// collectClusterData runs the data collection that only needs to
// occur once for the entire cluster.
func (zc *debugZipContext) collectClusterData(
ctx context.Context, firstNodeDetails *serverpb.DetailsResponse,
) (nodeList []serverpb.NodeDetails, livenessByNodeID nodeLivenesses, err error) {
) (ni nodesInfo, livenessByNodeID nodeLivenesses, err error) {
clusterWideZipRequests := makeClusterWideZipRequests(zc.admin, zc.status)

for _, r := range clusterWideZipRequests {
if err := zc.runZipRequest(ctx, zc.clusterPrinter, r); err != nil {
return nil, nil, err
return nodesInfo{}, nil, err
}
}

Expand All @@ -150,39 +140,38 @@ func (zc *debugZipContext) collectClusterData(
query = override
}
if err := zc.dumpTableDataForZip(zc.clusterPrinter, zc.firstNodeSQLConn, debugBase, table, query); err != nil {
return nil, nil, errors.Wrapf(err, "fetching %s", table)
return nodesInfo{}, nil, errors.Wrapf(err, "fetching %s", table)
}
}

{
var nodes *serverpb.NodesListResponse
s := zc.clusterPrinter.start("requesting nodes")
err := zc.runZipFn(ctx, s, func(ctx context.Context) error {
nodes, err = zc.status.NodesList(ctx, &serverpb.NodesListRequest{})
if code := status.Code(errors.Cause(err)); code == codes.Unimplemented {
// Fallback to the old Nodes API; this could occur while connecting to
// an older node which does not have the NodesList API implemented.
nodes, err = zc.getNodesList(ctx)
}
ni, err = zc.nodesInfo(ctx)
return err
})
if cErr := zc.z.createJSONOrError(s, debugBase+"/nodes.json", nodes, err); cErr != nil {
return nil, nil, cErr
if ni.nodesStatusResponse != nil {
if cErr := zc.z.createJSONOrError(s, debugBase+"/nodes.json", ni.nodesStatusResponse, err); cErr != nil {
return nodesInfo{}, nil, cErr
}
} else {
if cErr := zc.z.createJSONOrError(s, debugBase+"/nodes.json", ni.nodesListResponse, err); cErr != nil {
return nodesInfo{}, nil, cErr
}
}

// In case nodes came up back empty (the Nodes() RPC failed), we
// still want to inspect the per-node endpoints on the head
// node. As per the above, we were able to connect at least to
// that.
nodeList = []serverpb.NodeDetails{{
NodeID: int32(firstNodeDetails.NodeID),
Address: firstNodeDetails.Address,
SQLAddress: firstNodeDetails.SQLAddress,
}}

if nodes != nil {
// If the nodes were found, use that instead.
nodeList = nodes.Nodes
if ni.nodesListResponse == nil {
// In case nodes came up back empty (the Nodes()/NodesList() RPC failed), we
// still want to inspect the per-node endpoints on the head
// node. As per the above, we were able to connect at least to
// that.
ni.nodesListResponse = &serverpb.NodesListResponse{
Nodes: []serverpb.NodeDetails{{
NodeID: int32(firstNodeDetails.NodeID),
Address: firstNodeDetails.Address,
SQLAddress: firstNodeDetails.SQLAddress,
}},
}
}

// We'll want livenesses to decide whether a node is decommissioned.
Expand All @@ -193,12 +182,46 @@ func (zc *debugZipContext) collectClusterData(
return err
})
if cErr := zc.z.createJSONOrError(s, livenessName+".json", nodes, err); cErr != nil {
return nil, nil, cErr
return nodesInfo{}, nil, cErr
}
livenessByNodeID = map[roachpb.NodeID]livenesspb.NodeLivenessStatus{}
if lresponse != nil {
livenessByNodeID = lresponse.Statuses
}
}
return nodeList, livenessByNodeID, nil
return ni, livenessByNodeID, nil
}

// nodesInfo constructs debug data for all nodes for the debug zip output.
// For SQL only servers, only the NodesListResponse is populated.
// For regular storage servers, the more detailed NodesResponse is
// returned along with the nodesListResponse.
func (zc *debugZipContext) nodesInfo(ctx context.Context) (ni nodesInfo, _ error) {
nodesResponse, err := zc.status.Nodes(ctx, &serverpb.NodesRequest{})
nodesList := &serverpb.NodesListResponse{}
if code := status.Code(errors.Cause(err)); code == codes.Unimplemented {
// Likely a SQL only server; try the NodesList endpoint.
nodesList, err = zc.status.NodesList(ctx, &serverpb.NodesListRequest{})
}
if err != nil {
return nodesInfo{}, err
}
if nodesResponse != nil {
// Build a nodesListResponse from the nodes data. nodesListResponse is needed
// further downstream to perform other debug zip related functionality such as
// collecting per node debug data. This will only be executed for storage
// servers.
for _, node := range nodesResponse.Nodes {
nodeDetails := serverpb.NodeDetails{
NodeID: int32(node.Desc.NodeID),
Address: node.Desc.Address,
SQLAddress: node.Desc.SQLAddress,
}
nodesList.Nodes = append(nodesList.Nodes, nodeDetails)
}
}
ni.nodesListResponse = nodesList
ni.nodesStatusResponse = nodesResponse

return ni, nil
}
32 changes: 23 additions & 9 deletions pkg/cli/zip_per_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -91,9 +92,8 @@ var debugZipTablesPerNode = []string{
//
// This is called first and in isolation, before other zip operations
// possibly influence the nodes.
// TODO(rima): Collect profiles for tenant SQL nodes.
func (zc *debugZipContext) collectCPUProfiles(
ctx context.Context, nodeList []serverpb.NodeDetails, livenessByNodeID nodeLivenesses,
ctx context.Context, ni nodesInfo, livenessByNodeID nodeLivenesses,
) error {
if zipCtx.cpuProfDuration <= 0 {
// Nothing to do; return early.
Expand All @@ -108,6 +108,11 @@ func (zc *debugZipContext) collectCPUProfiles(

zc.clusterPrinter.info("requesting CPU profiles")

if ni.nodesListResponse == nil {
return errors.AssertionFailedf("nodes list is empty; nothing to do")
}

nodeList := ni.nodesListResponse.Nodes
// NB: this takes care not to produce non-deterministic log output.
resps := make([]profData, len(nodeList))
for i := range nodeList {
Expand Down Expand Up @@ -164,9 +169,12 @@ func (zc *debugZipContext) collectCPUProfiles(
}

func (zc *debugZipContext) collectPerNodeData(
ctx context.Context, node serverpb.NodeDetails, livenessByNodeID nodeLivenesses,
ctx context.Context,
nodeDetails serverpb.NodeDetails,
nodeStatus *statuspb.NodeStatus,
livenessByNodeID nodeLivenesses,
) error {
nodeID := roachpb.NodeID(node.NodeID)
nodeID := roachpb.NodeID(nodeDetails.NodeID)

if livenessByNodeID != nil {
liveness := livenessByNodeID[nodeID]
Expand Down Expand Up @@ -195,9 +203,15 @@ func (zc *debugZipContext) collectPerNodeData(
}
return nil
}

if err := zc.z.createJSON(nodePrinter.start("node status"), prefix+"/status.json", node); err != nil {
return err
if nodeStatus != nil {
// Use nodeStatus to populate the status.json file as it contains more data for a KV node.
if err := zc.z.createJSON(nodePrinter.start("node status"), prefix+"/status.json", *nodeStatus); err != nil {
return err
}
} else {
if err := zc.z.createJSON(nodePrinter.start("node status"), prefix+"/status.json", nodeDetails); err != nil {
return err
}
}

// Don't use sqlConn because that's only for is the node `debug
Expand All @@ -207,9 +221,9 @@ func (zc *debugZipContext) collectPerNodeData(
// not work and if it doesn't, we let the invalid curSQLConn get
// used anyway so that anything that does *not* need it will
// still happen.
sqlAddr := node.SQLAddress
sqlAddr := nodeDetails.SQLAddress
if sqlAddr.IsEmpty() {
sqlAddr = node.Address
sqlAddr = nodeDetails.Address
}
curSQLConn := guessNodeURL(zc.firstNodeSQLConn.GetURL(), sqlAddr.AddressField)
nodePrinter.info("using SQL connection URL: %s", curSQLConn.GetURL())
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"statement_diagnostics_requests.go",
"statements.go",
"status.go",
"status_local_file_retrieval.go",
"sticky_engine.go",
"tcp_keepalive_manager.go",
"tenant.go",
Expand Down

0 comments on commit 8227fde

Please sign in to comment.