Skip to content

Commit

Permalink
Updates tsh ls for node/app/db/kube to accept new filter flags (#10980
Browse files Browse the repository at this point in the history
)

* Also adds a search keyword parser that takes in different
  delimiters (comma is used for tsh, space is used for web UI)

part of RFD 55
  • Loading branch information
kimlisa committed Mar 9, 2022
1 parent 3ec5d11 commit bf8095d
Show file tree
Hide file tree
Showing 13 changed files with 476 additions and 100 deletions.
117 changes: 80 additions & 37 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,24 +877,24 @@ func (c *Client) UpsertKubeServiceV2(ctx context.Context, s types.Server) (*type
// GetKubeServices returns the list of kubernetes services registered in the
// cluster.
func (c *Client) GetKubeServices(ctx context.Context) ([]types.Server, error) {
resources, err := c.GetResources(ctx, defaults.Namespace, types.KindKubeService)
resources, err := GetResourcesWithFilters(ctx, c, proto.ListResourcesRequest{
Namespace: defaults.Namespace,
ResourceType: types.KindKubeService,
})
if err != nil {
// DELETE IN 10.0
// Underlying ListResources for kube service was not available, use fallback.
//
// DELETE IN 11.0.0
if trace.IsNotImplemented(err) {
return c.getKubeServicesFallback(ctx)
}

return nil, trace.Wrap(err)
}

servers := make([]types.Server, len(resources))
for i, resource := range resources {
srv, ok := resource.(types.Server)
if !ok {
return nil, trace.BadParameter("expected Server resource, got %T", resource)
}

servers[i] = srv
servers, err := types.ResourcesWithLabels(resources).AsServers()
if err != nil {
return nil, trace.Wrap(err)
}

return servers, nil
Expand All @@ -919,8 +919,14 @@ func (c *Client) getKubeServicesFallback(ctx context.Context) ([]types.Server, e

// GetApplicationServers returns all registered application servers.
func (c *Client) GetApplicationServers(ctx context.Context, namespace string) ([]types.AppServer, error) {
resources, err := c.GetResources(ctx, namespace, types.KindAppServer)
resources, err := GetResourcesWithFilters(ctx, c, proto.ListResourcesRequest{
Namespace: namespace,
ResourceType: types.KindAppServer,
})
if err != nil {
// Underlying ListResources for app server was not available, use fallback.
//
// DELETE IN 11.0.0
if trace.IsNotImplemented(err) {
servers, err := c.getApplicationServersFallback(ctx, namespace)
if err != nil {
Expand All @@ -933,14 +939,9 @@ func (c *Client) GetApplicationServers(ctx context.Context, namespace string) ([
return nil, trace.Wrap(err)
}

servers := make([]types.AppServer, len(resources))
for i, resource := range resources {
appServer, ok := resource.(types.AppServer)
if !ok {
return nil, trace.BadParameter("expected AppServer resource, got %T", resource)
}

servers[i] = appServer
servers, err := types.ResourcesWithLabels(resources).AsAppServers()
if err != nil {
return nil, trace.Wrap(err)
}

// In addition, we need to fetch legacy application servers.
Expand Down Expand Up @@ -1178,8 +1179,14 @@ func (c *Client) DeleteAllKubeServices(ctx context.Context) error {

// GetDatabaseServers returns all registered database proxy servers.
func (c *Client) GetDatabaseServers(ctx context.Context, namespace string) ([]types.DatabaseServer, error) {
resources, err := c.GetResources(ctx, namespace, types.KindDatabaseServer)
resources, err := GetResourcesWithFilters(ctx, c, proto.ListResourcesRequest{
Namespace: namespace,
ResourceType: types.KindDatabaseServer,
})
if err != nil {
// Underlying ListResources for db server was not available, use fallback.
//
// DELETE IN 11.0.0
if trace.IsNotImplemented(err) {
servers, err := c.getDatabaseServersFallback(ctx, namespace)
if err != nil {
Expand All @@ -1192,14 +1199,9 @@ func (c *Client) GetDatabaseServers(ctx context.Context, namespace string) ([]ty
return nil, trace.Wrap(err)
}

servers := make([]types.DatabaseServer, len(resources))
for i, resource := range resources {
databaseServer, ok := resource.(types.DatabaseServer)
if !ok {
return nil, trace.BadParameter("expected DatabaseServer resource, got %T", resource)
}

servers[i] = databaseServer
servers, err := types.ResourcesWithLabels(resources).AsDatabaseServers()
if err != nil {
return nil, trace.Wrap(err)
}

return servers, nil
Expand Down Expand Up @@ -1624,17 +1626,46 @@ func (c *Client) GetNode(ctx context.Context, namespace, name string) (types.Ser

// GetNodes returns a complete list of nodes that the user has access to in the given namespace.
func (c *Client) GetNodes(ctx context.Context, namespace string) ([]types.Server, error) {
return GetNodesWithLabels(ctx, c, namespace, nil)
resources, err := GetResourcesWithFilters(ctx, c, proto.ListResourcesRequest{
ResourceType: types.KindNode,
Namespace: namespace,
})
if err != nil {
// Underlying ListResources for nodes was not available, use fallback.
//
// DELETE IN 11.0.0
if trace.IsNotImplemented(err) {
servers, err := GetNodesWithLabels(ctx, c, namespace, nil)
if err != nil {
return nil, trace.Wrap(err)
}
return servers, nil
}

return nil, trace.Wrap(err)
}

servers, err := types.ResourcesWithLabels(resources).AsServers()
if err != nil {
return nil, trace.Wrap(err)
}

return servers, nil
}

// NodeClient is an interface used by GetNodesWithLabels to abstract over implementations of
// the ListNodes method.
//
// DELETE IN 11.0.0 with GetNodesWithLabels (used in both api/client/client.go and lib/auth/httpfallback.go)
// replaced by ListResourcesClient
type NodeClient interface {
ListNodes(ctx context.Context, req proto.ListNodesRequest) (nodes []types.Server, nextKey string, err error)
}

// GetNodesWithLabels is a helper for getting a list of nodes with optional label-based filtering. In addition to
// iterating pages, it also correctly handles downsizing pages when LimitExceeded errors are encountered.
//
// DELETE IN 11.0.0 replaced by GetResourcesWithFilters.
func GetNodesWithLabels(ctx context.Context, clt NodeClient, namespace string, labels map[string]string) ([]types.Server, error) {
// Retrieve the complete list of nodes in chunks.
var (
Expand Down Expand Up @@ -2425,25 +2456,37 @@ func (c *Client) ListResources(ctx context.Context, req proto.ListResourcesReque
}, nil
}

// GetResources retrieves all pages from ListResourcesPage and return all
// resources.
func (c *Client) GetResources(ctx context.Context, namespace, resourceType string) ([]types.ResourceWithLabels, error) {
// ListResourcesClient is an interface used by GetResourcesWithFilters to abstract over implementations of
// the ListResources method.
type ListResourcesClient interface {
ListResources(ctx context.Context, req proto.ListResourcesRequest) (*types.ListResourcesResponse, error)
}

// GetResourcesWithFilters is a helper for getting a list of resources with optional filtering. In addition to
// iterating pages, it also correctly handles downsizing pages when LimitExceeded errors are encountered.
func GetResourcesWithFilters(ctx context.Context, clt ListResourcesClient, req proto.ListResourcesRequest) ([]types.ResourceWithLabels, error) {
// Retrieve the complete list of resources in chunks.
var (
resources []types.ResourceWithLabels
startKey string
chunkSize = int32(defaults.DefaultChunkSize)
)

for {
resp, err := c.ListResources(ctx, proto.ListResourcesRequest{
Namespace: namespace,
ResourceType: resourceType,
StartKey: startKey,
Limit: chunkSize,
resp, err := clt.ListResources(ctx, proto.ListResourcesRequest{
Namespace: req.Namespace,
ResourceType: req.ResourceType,
StartKey: startKey,
Limit: chunkSize,
Labels: req.Labels,
SearchKeywords: req.SearchKeywords,
PredicateExpression: req.PredicateExpression,
})
if err != nil {
if trace.IsLimitExceeded(err) {
// Cut chunkSize in half if gRPC max message size is exceeded.
chunkSize = chunkSize / 2
// This is an extremely unlikely scenario, but better to cover it anyways.
if chunkSize == 0 {
return nil, trace.Wrap(trail.FromGRPC(err), "resource is too large to retrieve")
}
Expand Down
8 changes: 6 additions & 2 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func TestGetResources(t *testing.T) {
expectedResources, err := testResources(test.resourceType, defaults.Namespace)
require.NoError(t, err)

// listing everything at once breaks the ListResource.
// Test listing everything at once errors with limit exceeded.
_, err = clt.ListResources(ctx, proto.ListResourcesRequest{
Namespace: defaults.Namespace,
Limit: int32(len(expectedResources)),
Expand All @@ -603,7 +603,11 @@ func TestGetResources(t *testing.T) {
require.Error(t, err)
require.IsType(t, &trace.LimitExceededError{}, err.(*trace.TraceErr).OrigError())

resources, err := clt.GetResources(ctx, defaults.Namespace, test.resourceType)
// Test getting all resources by chunks to handle limit exceeded.
resources, err := GetResourcesWithFilters(ctx, clt, proto.ListResourcesRequest{
Namespace: defaults.Namespace,
ResourceType: test.resourceType,
})
require.NoError(t, err)
require.Len(t, resources, len(expectedResources))
require.Empty(t, cmp.Diff(expectedResources, resources))
Expand Down
7 changes: 3 additions & 4 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2208,7 +2208,7 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus

// ListNodes expect labels as a value of host
tc.Host = ""
servers, err := tc.ListNodes(ctx)
servers, err := tc.ListNodesWithFilters(ctx)
require.NoError(t, err)
require.Len(t, servers, 2)
tc.Host = Loopback
Expand Down Expand Up @@ -3043,7 +3043,6 @@ func waitForNodeCount(ctx context.Context, t *TeleInstance, clusterName string,
return nil
}
return trace.BadParameter("did not find %v nodes", count)

})
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -4644,7 +4643,7 @@ func testList(t *testing.T, suite *integrationTestSuite) {

// Get list of nodes and check that the returned nodes match the
// expected nodes.
nodes, err := userClt.ListNodes(context.Background())
nodes, err := userClt.ListNodesWithFilters(context.Background())
require.NoError(t, err)
for _, node := range nodes {
ok := apiutils.SliceContainsStr(tt.outNodes, node.GetHostname())
Expand Down Expand Up @@ -5025,7 +5024,7 @@ func testSSHExitCode(t *testing.T, suite *integrationTestSuite) {
lsPath, err := exec.LookPath("ls")
require.NoError(t, err)

var tests = []struct {
tests := []struct {
desc string
command []string
input string
Expand Down
3 changes: 3 additions & 0 deletions lib/auth/httpfallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,9 @@ type nodeClient interface {

// GetNodesWithLabels is a helper for getting a list of nodes with optional label-based filtering. This is essentially
// a wrapper around client.GetNodesWithLabels that performs fallback on NotImplemented errors.
//
// DELETE IN 11.0.0, this function is only called by lib/client/client.go (*ProxyClient).FindServersByLabels
// which is also marked for deletion (replaced by FindNodesByFilters).
func GetNodesWithLabels(ctx context.Context, clt nodeClient, namespace string, labels map[string]string) ([]types.Server, error) {
nodes, err := client.GetNodesWithLabels(ctx, clt, namespace, labels)
if err == nil || !trace.IsNotImplemented(err) {
Expand Down
Loading

0 comments on commit bf8095d

Please sign in to comment.