Skip to content

Commit

Permalink
Merge pull request #7610 from tinyspeck/am_vtadmin_find_schema
Browse files Browse the repository at this point in the history
[vtadmin] Add FindSchema route
  • Loading branch information
rohit-nayak-ps authored Mar 6, 2021
2 parents 78451c0 + b5b0713 commit 2b18baa
Show file tree
Hide file tree
Showing 11 changed files with 1,511 additions and 129 deletions.
430 changes: 359 additions & 71 deletions go/vt/proto/vtadmin/vtadmin.pb.go

Large diffs are not rendered by default.

77 changes: 77 additions & 0 deletions go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtadmin/cluster"
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewAPI(clusters []*cluster.Cluster, opts grpcserver.Options, httpOpts vtadm
router.HandleFunc("/clusters", httpAPI.Adapt(vtadminhttp.GetClusters)).Name("API.GetClusters")
router.HandleFunc("/gates", httpAPI.Adapt(vtadminhttp.GetGates)).Name("API.GetGates")
router.HandleFunc("/keyspaces", httpAPI.Adapt(vtadminhttp.GetKeyspaces)).Name("API.GetKeyspaces")
router.HandleFunc("/schema/{table}", httpAPI.Adapt(vtadminhttp.FindSchema)).Name("API.FindSchema")
router.HandleFunc("/schema/{cluster_id}/{keyspace}/{table}", httpAPI.Adapt(vtadminhttp.GetSchema)).Name("API.GetSchema")
router.HandleFunc("/schemas", httpAPI.Adapt(vtadminhttp.GetSchemas)).Name("API.GetSchemas")
router.HandleFunc("/tablets", httpAPI.Adapt(vtadminhttp.GetTablets)).Name("API.GetTablets")
Expand Down Expand Up @@ -125,6 +127,81 @@ func (api *API) ListenAndServe() error {
return api.serv.ListenAndServe()
}

// FindSchema is part of the vtadminpb.VTAdminServer interface.
func (api *API) FindSchema(ctx context.Context, req *vtadminpb.FindSchemaRequest) (*vtadminpb.Schema, error) {
span, _ := trace.NewSpan(ctx, "API.FindSchema")
defer span.Finish()

span.Annotate("table", req.Table)

clusters, _ := api.getClustersForRequest(req.ClusterIds)

var (
m sync.Mutex
wg sync.WaitGroup
rec concurrency.AllErrorRecorder
results []*vtadminpb.Schema
)

for _, c := range clusters {
wg.Add(1)

go func(c *cluster.Cluster) {
defer wg.Done()

tablets, err := c.FindTablets(ctx, func(t *vtadminpb.Tablet) bool {
// Filter out all the non-serving tablets once, to make the
// later, per-keyspace filtering slightly faster (fewer
// potentially-redundant iterations).
return t.State == vtadminpb.Tablet_SERVING
}, -1)
if err != nil {
err := fmt.Errorf("could not find any serving tablets for cluster %s: %w", c.ID, err)
rec.RecordError(err)

return
}

schemas, err := api.getSchemas(ctx, c, tablets)
if err != nil {
err := fmt.Errorf("%w: while collecting schemas for cluster %s", err, c.ID)
rec.RecordError(err)

return
}

for _, schema := range schemas {
for _, td := range schema.TableDefinitions {
if td.Name == req.Table {
m.Lock()
results = append(results, schema)
m.Unlock()

return
}
}
}

log.Infof("cluster %s has no tables named %s", c.ID, req.Table)
}(c)
}

wg.Wait()

if rec.HasErrors() {
return nil, rec.Error()
}

switch len(results) {
case 0:
return nil, fmt.Errorf("%w: no schemas found with table named %s", errors.ErrNoSchema, req.Table)
case 1:
return results[0], nil
default:
return nil, fmt.Errorf("%w: %d schemas found with table named %s", errors.ErrAmbiguousSchema, len(results), req.Table)
}
}

// GetClusters is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetClusters(ctx context.Context, req *vtadminpb.GetClustersRequest) (*vtadminpb.GetClustersResponse, error) {
span, _ := trace.NewSpan(ctx, "API.GetClusters")
Expand Down
Loading

0 comments on commit 2b18baa

Please sign in to comment.