Skip to content

Commit

Permalink
Add /api/schemas endpoint to vtadmin-api
Browse files Browse the repository at this point in the history
Signed-off-by: Sara Bee <[email protected]>
  • Loading branch information
doeg committed Feb 2, 2021
1 parent 0ffb8c1 commit 8bf8622
Show file tree
Hide file tree
Showing 6 changed files with 965 additions and 53 deletions.
287 changes: 234 additions & 53 deletions go/vt/proto/vtadmin/vtadmin.pb.go

Large diffs are not rendered by default.

149 changes: 149 additions & 0 deletions go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func NewAPI(clusters []*cluster.Cluster, opts grpcserver.Options, httpOpts vtadm
router.HandleFunc("/clusters", httpAPI.Adapt(vtadminhttp.GetClusters)).Name("API.GetClusters")
router.HandleFunc("/gates", httpAPI.Adapt(vtadminhttp.GetGates)).Name("API.GetGates")
router.HandleFunc("/keyspaces", httpAPI.Adapt(vtadminhttp.GetKeyspaces)).Name("API.GetKeyspaces")
router.HandleFunc("/schemas", httpAPI.Adapt(vtadminhttp.GetSchemas)).Name("API.GetSchemas")
router.HandleFunc("/tablets", httpAPI.Adapt(vtadminhttp.GetTablets)).Name("API.GetTablets")
router.HandleFunc("/tablet/{tablet}", httpAPI.Adapt(vtadminhttp.GetTablet)).Name("API.GetTablet")

Expand Down Expand Up @@ -242,6 +243,154 @@ func (api *API) GetKeyspaces(ctx context.Context, req *vtadminpb.GetKeyspacesReq
}, nil
}

// GetSchemas is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetSchemas(ctx context.Context, req *vtadminpb.GetSchemasRequest) (*vtadminpb.GetSchemasResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.GetSchemas")
defer span.Finish()

clusters, _ := api.getClustersForRequest(req.ClusterIds)

var (
schemas []*vtadminpb.Schema
wg sync.WaitGroup
er concurrency.AllErrorRecorder
m sync.Mutex
)

for _, c := range clusters {
wg.Add(1)

// Get schemas for the cluster
go func(c *cluster.Cluster) {
defer wg.Done()

// Since tablets are per-cluster, we can fetch them once
// and use them throughout the other waitgroups.
tablets, err := api.getTablets(ctx, c)
if err != nil {
er.RecordError(err)
return
}

ss, err := api.getSchemas(ctx, c, tablets)
if err != nil {
er.RecordError(err)
return
}

m.Lock()
schemas = append(schemas, ss...)
m.Unlock()
}(c)
}

wg.Wait()

if er.HasErrors() {
return nil, er.Error()
}

return &vtadminpb.GetSchemasResponse{
Schemas: schemas,
}, nil
}

// getSchemas returns all of the schemas across all keyspaces in the given cluster.
func (api *API) getSchemas(ctx context.Context, c *cluster.Cluster, tablets []*vtadminpb.Tablet) ([]*vtadminpb.Schema, error) {
if err := c.Vtctld.Dial(ctx); err != nil {
return nil, err
}

resp, err := c.Vtctld.GetKeyspaces(ctx, &vtctldatapb.GetKeyspacesRequest{})
if err != nil {
return nil, err
}

var (
schemas []*vtadminpb.Schema
wg sync.WaitGroup
er concurrency.AllErrorRecorder
m sync.Mutex
)

for _, ks := range resp.Keyspaces {
wg.Add(1)

// Get schemas for the cluster/keyspace
go func(c *cluster.Cluster, ks *vtctldatapb.Keyspace) {
defer wg.Done()

ss, err := api.getSchemasForKeyspace(ctx, c, ks, tablets)
if err != nil {
er.RecordError(err)
return
}

// Ignore keyspaces without schemas
if ss == nil {
return
}

m.Lock()
schemas = append(schemas, ss)
m.Unlock()
}(c, ks)
}

wg.Wait()

if er.HasErrors() {
return nil, er.Error()
}

return schemas, nil
}

func (api *API) getSchemasForKeyspace(ctx context.Context, c *cluster.Cluster, ks *vtctldatapb.Keyspace, tablets []*vtadminpb.Tablet) (*vtadminpb.Schema, error) {
// Choose the first serving tablet.
var kt *vtadminpb.Tablet
for _, t := range tablets {
if t.Tablet.Keyspace != ks.Name || t.State != vtadminpb.Tablet_SERVING {
continue
}

kt = t
}

// Skip schema lookup on this keyspace if there are no serving tablets.
if kt == nil {
return nil, nil
}

if err := c.Vtctld.Dial(ctx); err != nil {
return nil, err
}

s, err := c.Vtctld.GetSchema(ctx, &vtctldatapb.GetSchemaRequest{
TabletAlias: kt.Tablet.Alias,
})

if err != nil {
return nil, err
}

// Ignore any schemas without table definitions; otherwise we return
// a vtadminpb.Schema object with only Cluster and Keyspace defined,
// which is not particularly useful.
if s == nil || s.Schema == nil || len(s.Schema.TableDefinitions) == 0 {
return nil, nil
}

return &vtadminpb.Schema{
Cluster: &vtadminpb.Cluster{
Id: c.ID,
Name: c.Name,
},
Keyspace: ks.Name,
TableDefinitions: s.Schema.TableDefinitions,
}, nil
}

// GetTablet is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetTablet(ctx context.Context, req *vtadminpb.GetTabletRequest) (*vtadminpb.Tablet, error) {
span, ctx := trace.NewSpan(ctx, "API.GetTablet")
Expand Down
Loading

0 comments on commit 8bf8622

Please sign in to comment.