Skip to content

Commit

Permalink
server: add endpoint to get # replicas for each (node, table) pair
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
Pete Vilter committed Apr 6, 2018
1 parent a08b56d commit dec97ac
Show file tree
Hide file tree
Showing 8 changed files with 1,838 additions and 218 deletions.
151 changes: 151 additions & 0 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -1359,6 +1360,156 @@ func (s *adminServer) Decommission(
return s.DecommissionStatus(ctx, &serverpb.DecommissionStatusRequest{NodeIDs: nodeIDs})
}

// ReplicaMatrix returns a count of replicas on each node for each table.
func (s *adminServer) ReplicaMatrix(
ctx context.Context, req *serverpb.ReplicaMatrixRequest,
) (*serverpb.ReplicaMatrixResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
ctx, session := s.NewContextAndSessionForRPC(ctx, args)
defer session.Finish(s.server.sqlExecutor)

resp := &serverpb.ReplicaMatrixResponse{
DatabaseInfo: make(map[string]serverpb.ReplicaMatrixResponse_DatabaseInfo),
ZoneConfigs: make(map[int64]serverpb.ReplicaMatrixResponse_ZoneConfig),
}

// Get ids and names for databases and tables.
// Set up this structure in the response.

// This relies on crdb_internal.tables returning data even for newly added tables
// and deleted tables (as opposed to e.g. information_schema) because we are interested
// in the data for all ranges, not just ranges for visible tables.
tablesQuery := `SELECT name, table_id, database_name, parent_id FROM "".crdb_internal.tables`
r1, err := s.server.sqlExecutor.ExecuteStatementsBuffered(session, tablesQuery, nil, 1)
if err != nil {
return nil, s.serverError(err)
}
defer r1.Close(ctx)

// Used later when we're scanning Meta2 and only have IDs, not names.
tableInfosByTableID := map[uint64]serverpb.ReplicaMatrixResponse_TableInfo{}

rows1 := r1.ResultList[0].Rows
for idx := 0; idx < rows1.Len(); idx++ {
row := rows1.At(idx)

tableName := (*string)(row[0].(*tree.DString))
tableID := uint64(tree.MustBeDInt(row[1]))
dbName := (*string)(row[2].(*tree.DString))

// Insert database if it doesn't exist.
dbInfo, ok := resp.DatabaseInfo[*dbName]
if !ok {
dbInfo = serverpb.ReplicaMatrixResponse_DatabaseInfo{
TableInfo: make(map[string]serverpb.ReplicaMatrixResponse_TableInfo),
}
resp.DatabaseInfo[*dbName] = dbInfo
}

// Get zone config for table.
zoneConfigQuery := fmt.Sprintf(
`SELECT id, cli_specifier FROM [EXPERIMENTAL SHOW ZONE CONFIGURATION FOR TABLE %s.%s]`,
(*tree.Name)(dbName), (*tree.Name)(tableName),
)
r, err := s.server.sqlExecutor.ExecuteStatementsBuffered(session, zoneConfigQuery, nil, 1)
if err != nil {
return nil, s.serverError(err)
}
defer r.Close(ctx)

rows := r.ResultList[0].Rows
if rows.Len() != 1 {
return nil, s.serverError(fmt.Errorf(
"could not get zone config for table %s; %d rows returned", *tableName, rows.Len(),
))
}

zcRow := rows.At(0)
zcID := int64(tree.MustBeDInt(zcRow[0]))

// Insert table.
tableInfo := serverpb.ReplicaMatrixResponse_TableInfo{
ReplicaCountByNodeId: make(map[roachpb.NodeID]int64),
ZoneConfigId: zcID,
}
dbInfo.TableInfo[*tableName] = tableInfo
tableInfosByTableID[tableID] = tableInfo
}

// Get replica counts.
if err := s.server.db.Txn(ctx, func(txnCtx context.Context, txn *client.Txn) error {
acct := s.memMonitor.MakeBoundAccount()
defer acct.Close(txnCtx)

kvs, err := sql.ScanMetaKVs(ctx, txn, roachpb.Span{
Key: keys.UserTableDataMin,
EndKey: keys.MaxKey,
})
if err != nil {
return err
}

// Group replicas by table and node, accumulate counts.
var rangeDesc roachpb.RangeDescriptor
for _, kv := range kvs {
if err := acct.Grow(txnCtx, int64(len(kv.Key)+len(kv.Value.RawBytes))); err != nil {
return err
}
if err := kv.ValueProto(&rangeDesc); err != nil {
return err
}

_, tableID, err := keys.DecodeTablePrefix(rangeDesc.StartKey.AsRawKey())
if err != nil {
return err
}

for _, replicaDesc := range rangeDesc.Replicas {
tableInfo, ok := tableInfosByTableID[tableID]
if !ok {
// This is a database, skip.
continue
}
tableInfo.ReplicaCountByNodeId[replicaDesc.NodeID]++
}
}
return nil
}); err != nil {
return nil, s.serverError(err)
}

// Get zone configs.
// TODO(vilterp): this can be done in parallel with getting table/db names and replica counts.
zoneConfigsQuery := `EXPERIMENTAL SHOW ALL ZONE CONFIGURATIONS`
r2, err := s.server.sqlExecutor.ExecuteStatementsBuffered(session, zoneConfigsQuery, nil, 1)
if err != nil {
return nil, s.serverError(err)
}
defer r2.Close(ctx)

rows2 := r2.ResultList[0].Rows
for idx := 0; idx < rows2.Len(); idx++ {
row := rows2.At(idx)

zcID := int64(tree.MustBeDInt(row[0]))
zcCliSpecifier := string(tree.MustBeDString(row[1]))
zcYaml := tree.MustBeDBytes(row[2])
zcBytes := tree.MustBeDBytes(row[3])
var zcProto config.ZoneConfig
if err := protoutil.Unmarshal([]byte(zcBytes), &zcProto); err != nil {
return nil, s.serverError(err)
}

resp.ZoneConfigs[zcID] = serverpb.ReplicaMatrixResponse_ZoneConfig{
CliSpecifier: zcCliSpecifier,
Config: zcProto,
ConfigYaml: string(zcYaml),
}
}

return resp, nil
}

// sqlQuery allows you to incrementally build a SQL query that uses
// placeholders. Instead of specific placeholders like $1, you instead use the
// temporary placeholder $.
Expand Down
131 changes: 131 additions & 0 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,3 +1312,134 @@ func TestAdminAPIFullRangeLog(t *testing.T) {
})
}
}

func TestAdminAPIReplicaMatrix(t *testing.T) {
defer leaktest.AfterTest(t)()
testCluster := serverutils.StartTestCluster(t, 3, base.TestClusterArgs{})
defer testCluster.Stopper().Stop(context.Background())

firstServer := testCluster.Server(0)
sqlDB := sqlutils.MakeSQLRunner(testCluster.ServerConn(0))

// Create some tables.
sqlDB.Exec(t, `CREATE DATABASE roachblog`)
sqlDB.Exec(t, `CREATE TABLE roachblog.posts (id INT PRIMARY KEY, title text, body text)`)
sqlDB.Exec(t, `CREATE TABLE roachblog.comments (
id INT PRIMARY KEY,
post_id INT REFERENCES roachblog.posts,
body text
)`)
// Test special characters in DB and table names.
sqlDB.Exec(t, `CREATE DATABASE "spec'chars"`)
sqlDB.Exec(t, `CREATE TABLE "spec'chars"."more'spec'chars" (id INT PRIMARY KEY)`)

sqlDB.CheckQueryResults(
t,
`SELECT table_id, name, parent_id
FROM crdb_internal.tables WHERE database_name = 'roachblog'
ORDER BY table_id`,
[][]string{
{"51", "posts", "50"},
{"52", "comments", "50"},
},
)

// Verify that we see their replicas in the ReplicaMatrix response, evenly spread
// across the test cluster's three nodes.

expectedResp := map[string]serverpb.ReplicaMatrixResponse_DatabaseInfo{
"roachblog": {
TableInfo: map[string]serverpb.ReplicaMatrixResponse_TableInfo{
"posts": {
ReplicaCountByNodeId: map[roachpb.NodeID]int64{
1: 1,
2: 1,
3: 1,
},
},
"comments": {
ReplicaCountByNodeId: map[roachpb.NodeID]int64{
1: 1,
2: 1,
3: 1,
},
},
},
},
"spec'chars": {
TableInfo: map[string]serverpb.ReplicaMatrixResponse_TableInfo{
"more'spec'chars": {
ReplicaCountByNodeId: map[roachpb.NodeID]int64{
1: 1,
2: 1,
3: 1,
},
},
},
},
}

// Wait for the new tables' ranges to be created and replicated.
testutils.SucceedsSoon(t, func() error {
var resp serverpb.ReplicaMatrixResponse
if err := getAdminJSONProto(firstServer, "replica_matrix", &resp); err != nil {
t.Fatal(err)
}

delete(resp.DatabaseInfo, "system") // delete results for system database.
if !reflect.DeepEqual(resp.DatabaseInfo, expectedResp) {
return fmt.Errorf("expected %v; got %v", expectedResp, resp.DatabaseInfo)
}

// Don't test anything about the zone configs for now; just verify that something is there.
if len(resp.ZoneConfigs) == 0 {
return fmt.Errorf("no zone configs returned")
}

return nil
})

// Add a zone config.
sqlDB.Exec(t, `ALTER TABLE roachblog.posts EXPERIMENTAL CONFIGURE ZONE 'num_replicas: 1'`)

expectedNewZoneConfigID := int64(51)
sqlDB.CheckQueryResults(
t,
`SELECT id
FROM [EXPERIMENTAL SHOW ALL ZONE CONFIGURATIONS]
WHERE cli_specifier = 'roachblog.posts'`,
[][]string{
{fmt.Sprintf("%d", expectedNewZoneConfigID)},
},
)

// Verify that we see the zone config and its effects.
testutils.SucceedsSoon(t, func() error {
var resp serverpb.ReplicaMatrixResponse
if err := getAdminJSONProto(firstServer, "replica_matrix", &resp); err != nil {
t.Fatal(err)
}

postsTableInfo := resp.DatabaseInfo["roachblog"].TableInfo["posts"]

// Verify that the TableInfo for roachblog.posts points at the new zone config.
if postsTableInfo.ZoneConfigId != expectedNewZoneConfigID {
t.Fatalf(
"expected roachblog.posts to have zone config id %d; had %d",
expectedNewZoneConfigID, postsTableInfo.ZoneConfigId,
)
}

// Verify that the num_replicas setting has taken effect.
numPostsReplicas := int64(0)
for _, count := range postsTableInfo.ReplicaCountByNodeId {
numPostsReplicas += count
}

if numPostsReplicas != 1 {
return fmt.Errorf("expected 1 replica; got %d", numPostsReplicas)
}

return nil
})
}
Loading

0 comments on commit dec97ac

Please sign in to comment.