Skip to content

Commit

Permalink
Merge pull request #7266 from tinyspeck/am_vtadmin_vtctld
Browse files Browse the repository at this point in the history
[vtadmin] Add vtctld proxy to vtadmin API, add GetKeyspaces endpoint
  • Loading branch information
rohit-nayak-ps authored Jan 15, 2021
2 parents a26cc27 + 00fba1e commit 36110d8
Show file tree
Hide file tree
Showing 27 changed files with 1,593 additions and 180 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
/go/test/endtoend/vtgate @harshit-gangal @systay
/go/vt/discovery @deepthi
/go/vt/mysqlctl @deepthi
/go/vt/proto/vtadmin @ajm188 @doeg
/go/vt/orchestrator @deepthi @shlomi-noach
/go/vt/proto/vtadmin @ajm188 @doeg
/go/vt/schema @shlomi-noach
/go/vt/sqlparser @harshit-gangal @systay
/go/vt/vtadmin @ajm188 @doeg
Expand Down
302 changes: 263 additions & 39 deletions go/vt/proto/vtadmin/vtadmin.pb.go

Large diffs are not rendered by default.

55 changes: 55 additions & 0 deletions go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/vt/vterrors"

vtadminpb "vitess.io/vitess/go/vt/proto/vtadmin"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

Expand Down Expand Up @@ -77,6 +78,7 @@ func NewAPI(clusters []*cluster.Cluster, opts grpcserver.Options, httpOpts vtadm
httpAPI := vtadminhttp.NewAPI(api)

router.HandleFunc("/gates", httpAPI.Adapt(vtadminhttp.GetGates)).Name("API.GetGates")
router.HandleFunc("/keyspaces", httpAPI.Adapt(vtadminhttp.GetKeyspaces)).Name("API.GetKeyspaces")
router.HandleFunc("/tablets", httpAPI.Adapt(vtadminhttp.GetTablets)).Name("API.GetTablets")
router.HandleFunc("/tablet/{tablet}", httpAPI.Adapt(vtadminhttp.GetTablet)).Name("API.GetTablet")

Expand Down Expand Up @@ -154,6 +156,59 @@ func (api *API) GetGates(ctx context.Context, req *vtadminpb.GetGatesRequest) (*
}, nil
}

// GetKeyspaces is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetKeyspaces(ctx context.Context, req *vtadminpb.GetKeyspacesRequest) (*vtadminpb.GetKeyspacesResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.GetKeyspaces")
defer span.Finish()

clusters, _ := api.getClustersForRequest(req.ClusterIds)

var (
keyspaces []*vtadminpb.Keyspace
wg sync.WaitGroup
er concurrency.AllErrorRecorder
m sync.Mutex
)

for _, c := range clusters {
wg.Add(1)

go func(c *cluster.Cluster) {
defer wg.Done()

if err := c.Vtctld.Dial(ctx); err != nil {
er.RecordError(err)
return
}

resp, err := c.Vtctld.GetKeyspaces(ctx, &vtctldatapb.GetKeyspacesRequest{})
if err != nil {
er.RecordError(err)
return
}

m.Lock()
for _, ks := range resp.Keyspaces {
keyspaces = append(keyspaces, &vtadminpb.Keyspace{
Cluster: c.ToProto(),
Keyspace: ks,
})
}
m.Unlock()
}(c)
}

wg.Wait()

if er.HasErrors() {
return nil, er.Error()
}

return &vtadminpb.GetKeyspacesResponse{
Keyspaces: keyspaces,
}, nil
}

// GetTablet is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetTablet(ctx context.Context, req *vtadminpb.GetTabletRequest) (*vtadminpb.Tablet, error) {
span, ctx := trace.NewSpan(ctx, "API.GetTablet")
Expand Down
117 changes: 111 additions & 6 deletions go/vt/vtadmin/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,27 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vitessdriver"
"vitess.io/vitess/go/vt/vtadmin/cluster"
"vitess.io/vitess/go/vt/vtadmin/cluster/discovery/fakediscovery"
"vitess.io/vitess/go/vt/vtadmin/grpcserver"
"vitess.io/vitess/go/vt/vtadmin/http"
vtadminvtctldclient "vitess.io/vitess/go/vt/vtadmin/vtctldclient"
"vitess.io/vitess/go/vt/vtadmin/vtsql"
"vitess.io/vitess/go/vt/vtadmin/vtsql/fakevtsql"
"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver"
"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver/testutil"
"vitess.io/vitess/go/vt/vtctl/vtctldclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtadminpb "vitess.io/vitess/go/vt/proto/vtadmin"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/proto/vttime"
)

func TestGetGates(t *testing.T) {
Expand Down Expand Up @@ -89,6 +99,91 @@ func TestGetGates(t *testing.T) {
assert.Nil(t, resp)
}

func TestGetKeyspaces(t *testing.T) {
ts1 := memorytopo.NewServer("c1_cell1")
ts2 := memorytopo.NewServer("c2_cell1")

testutil.AddKeyspace(context.Background(), t, ts1, &vtctldatapb.Keyspace{
Name: "testkeyspace",
Keyspace: &topodatapb.Keyspace{},
})
testutil.AddKeyspace(context.Background(), t, ts1, &vtctldatapb.Keyspace{
Name: "snapshot",
Keyspace: &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_SNAPSHOT,
BaseKeyspace: "testkeyspace",
SnapshotTime: &vttime.Time{Seconds: 10, Nanoseconds: 1},
},
})

testutil.AddKeyspace(context.Background(), t, ts2, &vtctldatapb.Keyspace{
Name: "customer",
Keyspace: &topodatapb.Keyspace{},
})

testutil.WithTestServer(t, grpcvtctldserver.NewVtctldServer(ts1), func(t *testing.T, cluster1Client vtctldclient.VtctldClient) {
testutil.WithTestServer(t, grpcvtctldserver.NewVtctldServer(ts2), func(t *testing.T, cluster2Client vtctldclient.VtctldClient) {
c1 := buildCluster(1, cluster1Client, nil, nil)
c2 := buildCluster(2, cluster2Client, nil, nil)

api := NewAPI([]*cluster.Cluster{c1, c2}, grpcserver.Options{}, http.Options{})
resp, err := api.GetKeyspaces(context.Background(), &vtadminpb.GetKeyspacesRequest{})
require.NoError(t, err)

expected := &vtadminpb.GetKeyspacesResponse{
Keyspaces: []*vtadminpb.Keyspace{
{
Cluster: &vtadminpb.Cluster{
Id: "c1",
Name: "cluster1",
},
Keyspace: &vtctldatapb.Keyspace{
Name: "testkeyspace",
Keyspace: &topodatapb.Keyspace{},
},
},
{
Cluster: &vtadminpb.Cluster{
Id: "c1",
Name: "cluster1",
},
Keyspace: &vtctldatapb.Keyspace{
Name: "snapshot",
Keyspace: &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_SNAPSHOT,
BaseKeyspace: "testkeyspace",
SnapshotTime: &vttime.Time{Seconds: 10, Nanoseconds: 1},
},
},
},
{
Cluster: &vtadminpb.Cluster{
Id: "c2",
Name: "cluster2",
},
Keyspace: &vtctldatapb.Keyspace{
Name: "customer",
Keyspace: &topodatapb.Keyspace{},
},
},
},
}
assert.ElementsMatch(t, expected.Keyspaces, resp.Keyspaces)

resp, err = api.GetKeyspaces(
context.Background(),
&vtadminpb.GetKeyspacesRequest{
ClusterIds: []string{"c1"},
},
)
require.NoError(t, err)

expected.Keyspaces = expected.Keyspaces[:2] // just c1
assert.ElementsMatch(t, expected.Keyspaces, resp.Keyspaces)
})
})
}

func TestGetTablets(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -250,7 +345,7 @@ func TestGetTablets(t *testing.T) {
clusters := make([]*cluster.Cluster, len(tt.clusterTablets))

for i, tablets := range tt.clusterTablets {
cluster := buildCluster(i, tablets, tt.dbconfigs)
cluster := buildCluster(i, nil, tablets, tt.dbconfigs)
clusters[i] = cluster
}

Expand Down Expand Up @@ -511,7 +606,7 @@ func TestGetTablet(t *testing.T) {
clusters := make([]*cluster.Cluster, len(tt.clusterTablets))

for i, tablets := range tt.clusterTablets {
cluster := buildCluster(i, tablets, tt.dbconfigs)
cluster := buildCluster(i, nil, tablets, tt.dbconfigs)
clusters[i] = cluster
}

Expand All @@ -532,12 +627,13 @@ type dbcfg struct {
shouldErr bool
}

// shared helper for building a cluster that contains the given tablets.
// dbconfigs contains an optional config for controlling the behavior of the
// cluster's DB at the package sql level.
func buildCluster(i int, tablets []*vtadminpb.Tablet, dbconfigs map[string]*dbcfg) *cluster.Cluster {
// shared helper for building a cluster that contains the given tablets and
// talking to the given vtctld server. dbconfigs contains an optional config
// for controlling the behavior of the cluster's DB at the package sql level.
func buildCluster(i int, vtctldClient vtctldclient.VtctldClient, tablets []*vtadminpb.Tablet, dbconfigs map[string]*dbcfg) *cluster.Cluster {
disco := fakediscovery.New()
disco.AddTaggedGates(nil, &vtadminpb.VTGate{Hostname: fmt.Sprintf("cluster%d-gate", i)})
disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{Hostname: "doesn't matter"})

cluster := &cluster.Cluster{
ID: fmt.Sprintf("c%d", i),
Expand All @@ -558,7 +654,16 @@ func buildCluster(i int, tablets []*vtadminpb.Tablet, dbconfigs map[string]*dbcf
return sql.OpenDB(&fakevtsql.Connector{Tablets: tablets, ShouldErr: dbconfig.shouldErr}), nil
}

vtctld := vtadminvtctldclient.New(&vtadminvtctldclient.Config{
Cluster: cluster.ToProto(),
Discovery: disco,
})
vtctld.DialFunc = func(addr string, ff grpcclient.FailFast, opts ...grpc.DialOption) (vtctldclient.VtctldClient, error) {
return vtctldClient, nil
}

cluster.DB = db
cluster.Vtctld = vtctld

return cluster
}
21 changes: 15 additions & 6 deletions go/vt/vtadmin/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"vitess.io/vitess/go/vt/vtadmin/cluster/discovery"
"vitess.io/vitess/go/vt/vtadmin/vtctldclient"
"vitess.io/vitess/go/vt/vtadmin/vtsql"

vtadminpb "vitess.io/vitess/go/vt/proto/vtadmin"
Expand All @@ -33,10 +34,8 @@ type Cluster struct {
Name string
Discovery discovery.Discovery

// (TODO|@amason): after merging #7128, this still requires some additional
// work, so deferring this for now!
// vtctl vtctldclient.VtctldClient
DB vtsql.DB
DB vtsql.DB
Vtctld vtctldclient.Proxy

// These fields are kept to power debug endpoints.
// (TODO|@amason): Figure out if these are needed or if there's a way to
Expand All @@ -60,14 +59,24 @@ func New(cfg Config) (*Cluster, error) {

cluster.Discovery = disco

protocluster := cluster.ToProto()

vtsqlargs := buildPFlagSlice(cfg.VtSQLFlags)

vtsqlCfg, err := vtsql.Parse(cluster.ToProto(), disco, vtsqlargs)
vtsqlCfg, err := vtsql.Parse(protocluster, disco, vtsqlargs)
if err != nil {
return nil, fmt.Errorf("error while creating vtsql connection config: %w", err)
}

vtctldargs := buildPFlagSlice(cfg.VtctldFlags)

vtctldCfg, err := vtctldclient.Parse(protocluster, disco, vtctldargs)
if err != nil {
return nil, fmt.Errorf("error while creating vtsql connection: %w", err)
return nil, fmt.Errorf("error while creating vtctldclient proxy config: %w", err)
}

cluster.DB = vtsql.New(vtsqlCfg)
cluster.Vtctld = vtctldclient.New(vtctldCfg)

return cluster, nil
}
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtadmin/cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Config struct {
DiscoveryImpl string
DiscoveryFlagsByImpl FlagsByImpl
VtSQLFlags map[string]string
VtctldFlags map[string]string
}

// Cluster returns a new cluster instance from the given config.
Expand Down Expand Up @@ -82,6 +83,7 @@ func (cfg Config) Merge(override Config) Config {
DiscoveryImpl: cfg.DiscoveryImpl,
DiscoveryFlagsByImpl: map[string]map[string]string{},
VtSQLFlags: map[string]string{},
VtctldFlags: map[string]string{},
}

if override.ID != "" {
Expand All @@ -104,6 +106,9 @@ func (cfg Config) Merge(override Config) Config {
mergeStringMap(merged.VtSQLFlags, cfg.VtSQLFlags)
mergeStringMap(merged.VtSQLFlags, override.VtSQLFlags)

mergeStringMap(merged.VtctldFlags, cfg.VtctldFlags)
mergeStringMap(merged.VtctldFlags, override.VtctldFlags)

return merged
}

Expand Down
19 changes: 17 additions & 2 deletions go/vt/vtadmin/cluster/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestMergeConfig(t *testing.T) {
DiscoveryImpl: "consul",
DiscoveryFlagsByImpl: FlagsByImpl{},
VtSQLFlags: map[string]string{},
VtctldFlags: map[string]string{},
},
},
{
Expand Down Expand Up @@ -81,18 +82,23 @@ func TestMergeConfig(t *testing.T) {
"foo": "baz",
},
},
VtSQLFlags: map[string]string{},
VtSQLFlags: map[string]string{},
VtctldFlags: map[string]string{},
},
},
{
name: "merging vtsql flags",
name: "merging vtsql/vtctld flags",
base: Config{
ID: "c1",
Name: "cluster1",
VtSQLFlags: map[string]string{
"one": "one",
"two": "2",
},
VtctldFlags: map[string]string{
"a": "A",
"b": "B",
},
},
override: Config{
ID: "c1",
Expand All @@ -101,6 +107,10 @@ func TestMergeConfig(t *testing.T) {
"two": "two",
"three": "three",
},
VtctldFlags: map[string]string{
"a": "alpha",
"c": "C",
},
},
expected: Config{
ID: "c1",
Expand All @@ -111,6 +121,11 @@ func TestMergeConfig(t *testing.T) {
"two": "two",
"three": "three",
},
VtctldFlags: map[string]string{
"a": "alpha",
"b": "B",
"c": "C",
},
},
},
}
Expand Down
Loading

0 comments on commit 36110d8

Please sign in to comment.